This is an automated email from the ASF dual-hosted git repository.

SpriCoder pushed a commit to branch fs/inner-view
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 068425994459ac28c90fca4a7c0cc9be2e51722c
Author: spricoder <[email protected]>
AuthorDate: Thu Apr 30 18:31:33 2026 +0800

    add tee
---
 .../plans/2026-04-29-cli-filesystem-mode.md        |  39 +++
 .../specs/2026-04-29-cli-filesystem-mode-design.md | 102 +++++++
 .../src/main/java/org/apache/iotdb/cli/Cli.java    |   2 +-
 .../org/apache/iotdb/cli/fs/FilesystemShell.java   |  84 +++++-
 .../iotdb/cli/fs/command/FilesystemCommand.java    |   1 +
 .../cli/fs/command/FilesystemCommandParser.java    |  13 +
 .../fs/provider/FilesystemMutationProvider.java    |   3 +
 .../cli/fs/provider/TableCsvAppendPlanner.java     | 304 +++++++++++++++++++++
 .../provider/TableFilesystemMutationProvider.java  |  17 ++
 .../UnsupportedFilesystemMutationProvider.java     |   6 +
 .../org/apache/iotdb/cli/utils/JlineUtils.java     |   3 +-
 .../apache/iotdb/cli/fs/FilesystemShellTest.java   |  54 ++++
 .../fs/command/FilesystemCommandParserTest.java    |  20 ++
 .../TableFilesystemMutationProviderTest.java       |  57 ++++
 14 files changed, 701 insertions(+), 4 deletions(-)

diff --git a/docs/superpowers/plans/2026-04-29-cli-filesystem-mode.md 
b/docs/superpowers/plans/2026-04-29-cli-filesystem-mode.md
index 081a5ce8748..0a85d3525b5 100644
--- a/docs/superpowers/plans/2026-04-29-cli-filesystem-mode.md
+++ b/docs/superpowers/plans/2026-04-29-cli-filesystem-mode.md
@@ -102,6 +102,7 @@ These notes capture follow-up implementation experience for 
quickly resuming thi
   - `mkdir /db` creates a database.
   - `rm /db/table.csv` drops a table.
   - `mv /db/t1.csv /db/t2.csv` renames a table inside the same database.
+  - `tee -a /db/table.csv` appends CSV records as table rows.
   - Forbid `rm` or `mv` of `/db/table.schema` and `/db/table.meta`.
   - Forbid `rm /db`.
   - Forbid cross-database rename such as `mv /db1/t.csv /db2/t.csv`.
@@ -117,6 +118,43 @@ These notes capture follow-up implementation experience 
for quickly resuming thi
   the server returned `550`; the unchecked propagation exited the CLI. Keep a 
regression test for
   this behavior.
 
+## Data Append Design
+
+The first table data-write implementation supports only append semantics over 
CSV table files:
+
+```bash
+tee -a /db/table.csv
+```
+
+Design decisions already approved:
+
+- Only append writes are supported. Truncate, overwrite, random writes, row 
deletion, and operating
+  system redirection such as `>> /db/table.csv` are out of scope.
+- The target must be a table-mode data file `/<database>/<table>.csv`.
+- `tee` without `-a`, sidecar files, legacy column paths, and tree-mode paths 
are rejected.
+- Writes require `--access_mode filesystem --sql_dialect table --fs_write_mode 
enabled`.
+- Non-interactive mode reads CSV from stdin until EOF and then submits.
+- Interactive mode enters an append buffer. `:wq` validates and submits, `:q!` 
discards, and `:q`
+  exits only if the buffer is empty.
+- The first version does not support `:w`.
+- CSV parsing should use Apache Commons CSV or another proven parser already 
available to the
+  module.
+- Header and headerless input are both supported.
+- Header input may write a subset of columns but must include `time`.
+- Headerless input must provide all columns in the current `/db/table.csv` 
output order.
+- Every record must explicitly provide `time`; `time` cannot be empty or `\N`.
+- `\N` is the explicit SQL `NULL` marker for non-time columns. Empty fields 
are not automatically
+  NULL.
+- Client-side validation covers path, write mode, CSV shape, known columns, 
and required `time`.
+- IoTDB performs type validation, timestamp-format validation, permissions 
checks, and final insert
+  semantics.
+- Validated records map to SQL of the form
+  `INSERT INTO db.table(col1,col2,...) VALUES (...), (...), ...`.
+- Insert statements should be chunked, for example 1000 records per statement.
+- The full buffer must pass client validation before any insert is attempted.
+- Server-side insert failure is not compensated or retried automatically. 
Interactive mode keeps the
+  buffer; non-interactive mode exits with failure.
+
 ## Supported Command Quick Reference
 
 | Command | Description | Example |
@@ -142,6 +180,7 @@ These notes capture follow-up implementation experience for 
quickly resuming thi
 | `mkdir <path>` | Write-gated; in table mode with writes enabled, creates a 
database. | `mkdir /newdb` |
 | `rm <path>` | Write-gated; in table mode with writes enabled, only table CSV 
drop is allowed. | `rm /db/table.csv` |
 | `mv <source> <target>` | Write-gated; in table mode with writes enabled, 
only same-database table CSV rename is allowed. | `mv /db/t1.csv /db/t2.csv` |
+| `tee -a <path>` | Write-gated; in table mode with writes enabled, appends 
CSV input to a table data file. | `tee -a /db/table.csv` |
 | `help` | Print filesystem-mode help. | `help` |
 | `exit` / `quit` | Exit filesystem mode. | `exit` |
 
diff --git a/docs/superpowers/specs/2026-04-29-cli-filesystem-mode-design.md 
b/docs/superpowers/specs/2026-04-29-cli-filesystem-mode-design.md
index 2de2d8a71c6..baeee519fd2 100644
--- a/docs/superpowers/specs/2026-04-29-cli-filesystem-mode-design.md
+++ b/docs/superpowers/specs/2026-04-29-cli-filesystem-mode-design.md
@@ -219,6 +219,7 @@ semantics wherever the same command exists. Provider 
support can still vary by d
 | `mkdir <path>` | Write-gated command. With table mode and `--fs_write_mode 
enabled`, `mkdir /db` creates a database. Otherwise it returns a read-only or 
unsupported error. | `mkdir /newdb` |
 | `rm <path>` | Write-gated command. With table mode and `--fs_write_mode 
enabled`, only `rm /db/table.csv` is allowed and maps to table drop. | `rm 
/db/table.csv` |
 | `mv <source> <target>` | Write-gated command. With table mode and 
`--fs_write_mode enabled`, only same-database table CSV rename is allowed. | 
`mv /db/t1.csv /db/t2.csv` |
+| `tee -a <path>` | Write-gated append command. With table mode and 
`--fs_write_mode enabled`, only `tee -a /db/table.csv` appends CSV records as 
rows. | `tee -a /db/table.csv` |
 | `help` | Print filesystem-mode help. | `help` |
 | `exit` / `quit` | Exit filesystem mode. | `exit` |
 
@@ -251,6 +252,7 @@ Raw SQL should be run in the default SQL access mode.
 | `stat /db/table.meta` | `SHOW TABLES FROM db`, filtered to the table and 
rendered as filesystem metadata for the metadata sidecar. |
 | `cat /db/table.csv` | `SELECT * FROM db.table LIMIT <limit>`, formatted as 
CSV records. |
 | `cut -d, -f2,3 /db/table.csv` | Delimiter-based text field projection over 
the CSV records. |
+| `tee -a /db/table.csv` | Parse CSV input with Apache Commons CSV, validate 
columns and required `time`, then execute chunked `INSERT INTO db.table(...) 
VALUES ...`. |
 | `cat /db/table.schema` | `DESC db.table DETAILS`, formatted as CSV with 
IoTDB result columns preserved. |
 | `cat /db/table.meta` | `SHOW TABLES DETAILS FROM db`, filtered to the table 
and formatted as CSV with IoTDB result columns preserved. |
 | `stat /db/table/col` | Legacy compatibility: `DESC db.table DETAILS`, 
filtered to the column. |
@@ -321,6 +323,102 @@ example, if `cat time` is resolved from `/testtest` to 
`/testtest/time`, table m
 table path and may receive a server error such as `550: Table 'testtest.time' 
does not exist`. That
 error should be printed as `cat: 550: ...`, then the prompt should continue.
 
+## Append Data Write Semantics
+
+Table data writes use Unix append semantics over the table CSV file:
+
+```bash
+tee -a /db/table.csv
+```
+
+Only append writes are in scope for the first data-write implementation. 
Truncation, overwrite,
+random writes, partial row deletion, and shell redirection such as `>> 
/db/table.csv` are out of
+scope because filesystem mode is an IoTDB CLI command surface, not an 
operating-system mount.
+
+Append writes are allowed only when all of these conditions hold:
+
+- CLI is running with `--access_mode filesystem`.
+- CLI is running with `--sql_dialect table`.
+- CLI is running with `--fs_write_mode enabled`.
+- Target path is exactly a table data file of the form 
`/<database>/<table>.csv`.
+- Command is `tee -a`; `tee` without `-a` is rejected.
+
+The sidecar files remain read-only metadata views:
+
+- `tee -a /db/table.schema` is rejected.
+- `tee -a /db/table.meta` is rejected.
+- Legacy column paths such as `/db/table/col` are not writable append targets.
+- Tree-mode paths remain non-writable even when `--fs_write_mode enabled` is 
set.
+
+### Input Modes
+
+Non-interactive mode reads CSV from standard input until EOF, then submits 
once:
+
+```bash
+printf 'time,key,value\n1,spricoder,2.0\n' | \
+  start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root \
+  --sql_dialect table --access_mode filesystem --fs_write_mode enabled \
+  -e 'tee -a /db/table.csv'
+```
+
+Interactive mode enters an append buffer:
+
+```text
+IoTDB:fs> tee -a /db/table.csv
+time,key,value
+1,spricoder,2.0
+2,spricoder,1.5
+:wq
+```
+
+The append buffer supports a minimal Vim-style command set:
+
+- `:wq`: validate, submit, and exit the append buffer after a successful write.
+- `:q!`: discard the buffer and exit.
+- `:q`: exit only when the buffer is empty; otherwise print guidance to use 
`:wq` or `:q!`.
+
+The first version intentionally does not support `:w`, because writing while 
keeping the buffer
+open creates ambiguity around repeated submissions and buffer clearing.
+
+### CSV Input Rules
+
+Append input is CSV and is parsed with a proven CSV parser, not by ad hoc 
string splitting.
+
+- Both header and headerless input are supported.
+- Header input may provide a subset of columns, but it must include `time`.
+- Headerless input must provide all columns in the current `/db/table.csv` 
output order.
+- Every appended record must explicitly provide `time`.
+- `time` must not be empty and must not be `\N`.
+- `\N` represents SQL `NULL` for non-time columns.
+- Empty fields are not automatically treated as `NULL`.
+- Client-side validation checks path, write mode, CSV shape, known columns, 
and required `time`.
+- IoTDB remains responsible for type validation, timestamp-format validation, 
permissions, and
+  final insert semantics.
+
+### SQL Mapping And Failure Semantics
+
+The provider should map validated append input to table-model SQL:
+
+```sql
+INSERT INTO db.table(col1,col2,...) VALUES (...), (...), ...
+```
+
+Implementation rules:
+
+- Use `DESC db.table DETAILS` or an equivalent schema query to get the table 
columns and output
+  order.
+- Use SQL identifier rendering and SQL literal escaping helpers; never 
concatenate raw user values
+  into SQL.
+- Convert `\N` to `NULL`.
+- Submit rows in fixed-size chunks, for example 1000 records per `INSERT`, to 
avoid oversized SQL
+  statements.
+- Client-side validation must pass for the full buffer before any write is 
attempted.
+- If a server-side insert fails, the CLI does not compensate or retry 
automatically. Server state is
+  whatever IoTDB actually committed.
+- In interactive mode, a server-side failure keeps the append buffer so the 
user can correct input
+  or abandon it with `:q!`.
+- In non-interactive mode, a failure prints a command-prefixed error and exits 
with failure.
+
 ## Proposed Code Structure
 
 New code should live under `org.apache.iotdb.cli.fs`.
@@ -332,6 +430,7 @@ New code should live under `org.apache.iotdb.cli.fs`.
 - `node/FsNode`, `node/FsNodeType`, `node/FsNodeMetadata`: typed metadata 
model.
 - `provider/FilesystemSchemaProvider`: schema and data read provider interface.
 - `provider/FilesystemMutationProvider`: write-gated mutation provider 
interface.
+- `provider/TableCsvAppendPlanner` or equivalent helper: CSV append validation 
and SQL planning.
 - `provider/TreeFilesystemSchemaProvider`: tree-model SQL mapping.
 - `provider/TableFilesystemSchemaProvider`: table-model SQL mapping.
 - `provider/TableFilesystemMutationProvider`: opt-in table-model write mapping.
@@ -363,6 +462,8 @@ The mutation provider owns the current write-gated 
operations:
 - `mkdir(FsPath path)`
 - `remove(FsPath path)`
 - `move(FsPath source, FsPath target)`
+- `append(FsPath path, List<String> csvLines)` or an equivalent table-mode 
append method for
+  `tee -a /db/table.csv`
 
 When writes are disabled, `FilesystemShell` rejects `mkdir`, `rm`, and `mv` 
before calling the
 mutation provider. When writes are enabled, table mode uses 
`TableFilesystemMutationProvider`; tree
@@ -373,6 +474,7 @@ The current table-model write boundary is intentionally 
narrow:
 - `mkdir /db` creates a database directory.
 - `rm /db/table.csv` drops the table data file and therefore the table.
 - `mv /db/t1.csv /db/t2.csv` renames a table within the same database.
+- `tee -a /db/table.csv` appends CSV records as table rows.
 
 The sidecar files are metadata views, not independently writable objects:
 
diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java
index ad5571efb41..dc42910214c 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java
@@ -221,7 +221,7 @@ public class Cli extends AbstractCli {
       connection.setQueryTimeout(queryTimeout);
       properties = connection.getServerProperties();
       timestampPrecision = properties.getTimestampPrecision();
-      createFilesystemShell(ctx, connection).execute(execute);
+      createFilesystemShell(ctx, connection).executeNonInteractive(execute);
       ctx.exit(CODE_OK);
     } catch (SQLException e) {
       ctx.getPrinter()
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/FilesystemShell.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/FilesystemShell.java
index 3ac9eca9d94..63c543d40c6 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/FilesystemShell.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/FilesystemShell.java
@@ -32,9 +32,14 @@ import org.apache.iotdb.cli.utils.CliContext;
 
 import org.jline.reader.Candidate;
 import org.jline.reader.Completer;
+import org.jline.reader.EndOfFileException;
 import org.jline.reader.LineReader;
 import org.jline.reader.ParsedLine;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -48,8 +53,8 @@ public class FilesystemShell {
   private static final List<String> COMMANDS =
       Arrays.asList(
           "pwd", "ls", "ll", "cd", "stat", "cat", "head", "tail", "wc", 
"grep", "find", "less",
-          "more", "file", "du", "mkdir", "rm", "mv", "cut", "paste", "tree", 
"help", "exit",
-          "quit");
+          "more", "file", "du", "mkdir", "rm", "mv", "cut", "paste", "tree", 
"help", "exit", "quit",
+          "tee");
 
   private final CliContext ctx;
   private final FilesystemSchemaProvider provider;
@@ -133,6 +138,9 @@ public class FilesystemShell {
       case PASTE:
         printRows(provider.read(resolve(command.getPaths()), 
DEFAULT_READ_LIMIT));
         return true;
+      case TEE:
+        append(command.getPath(), false);
+        return true;
       case HELP:
         printHelp();
         return true;
@@ -151,6 +159,15 @@ public class FilesystemShell {
     }
   }
 
+  public boolean executeNonInteractive(String input) throws SQLException {
+    FilesystemCommand command = FilesystemCommandParser.parse(input);
+    if (command.getType() == FilesystemCommand.Type.TEE) {
+      append(command.getPath(), true);
+      return true;
+    }
+    return execute(input);
+  }
+
   public Completer createCompleter() {
     return new FilesystemCompleter();
   }
@@ -363,6 +380,68 @@ public class FilesystemShell {
     mutationProvider.move(source, target);
   }
 
+  private void append(String path, boolean nonInteractive) throws SQLException 
{
+    FsPath resolvedPath = resolve(path);
+    if (!ensureWritable("tee", resolvedPath)) {
+      return;
+    }
+    if (nonInteractive || ctx.getLineReader() == null) {
+      mutationProvider.append(resolvedPath, readStandardInputLines());
+      return;
+    }
+    appendInteractive(resolvedPath);
+  }
+
+  private List<String> readStandardInputLines() throws SQLException {
+    List<String> lines = new ArrayList<>();
+    try {
+      BufferedReader reader =
+          new BufferedReader(new InputStreamReader(ctx.getIn(), 
StandardCharsets.UTF_8));
+      String line;
+      while ((line = reader.readLine()) != null) {
+        lines.add(line);
+      }
+      return lines;
+    } catch (IOException e) {
+      throw new SQLException("Failed to read standard input", e);
+    }
+  }
+
+  private void appendInteractive(FsPath path) throws SQLException {
+    List<String> lines = new ArrayList<>();
+    while (true) {
+      String line;
+      try {
+        line = ctx.getLineReader().readLine("tee> ", null);
+      } catch (EndOfFileException e) {
+        if (!lines.isEmpty()) {
+          ctx.getPrinter().println("tee: use :wq to write or :q! to quit 
without writing");
+        }
+        return;
+      }
+      if (":wq".equals(line)) {
+        try {
+          mutationProvider.append(path, lines);
+          return;
+        } catch (SQLException e) {
+          ctx.getPrinter().println("tee: " + e.getMessage());
+          continue;
+        }
+      }
+      if (":q!".equals(line)) {
+        return;
+      }
+      if (":q".equals(line)) {
+        if (lines.isEmpty()) {
+          return;
+        }
+        ctx.getPrinter().println("tee: use :wq to write or :q! to quit without 
writing");
+        continue;
+      }
+      lines.add(line);
+    }
+  }
+
   private boolean ensureWritable(String command, FsPath path) {
     if (writeEnabled) {
       return true;
@@ -405,6 +484,7 @@ public class FilesystemShell {
     ctx.getPrinter().println("mv <source> <target>");
     ctx.getPrinter().println("cut -d<delimiter> -f<fields> <path>");
     ctx.getPrinter().println("paste <path>...");
+    ctx.getPrinter().println("tee -a <path>");
     ctx.getPrinter().println("tree [-L depth] [path]");
     ctx.getPrinter().println("exit");
   }
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommand.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommand.java
index df86856ab80..2f77a787a54 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommand.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommand.java
@@ -45,6 +45,7 @@ public class FilesystemCommand {
     MV,
     CUT,
     PASTE,
+    TEE,
     TREE,
     SQL,
     HELP,
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParser.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParser.java
index 62d267153b7..75992cf0fc7 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParser.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParser.java
@@ -111,6 +111,9 @@ public class FilesystemCommandParser {
     if ("paste".equals(command)) {
       return parsePaste(tokens);
     }
+    if ("tee".equals(command)) {
+      return parseTee(tokens);
+    }
     if ("tree".equals(command)) {
       return parseTree(tokens);
     }
@@ -136,6 +139,16 @@ public class FilesystemCommandParser {
     return FilesystemCommand.paths(FilesystemCommand.Type.PASTE, paths);
   }
 
+  private static FilesystemCommand parseTee(String[] tokens) {
+    if (tokens.length != 3) {
+      return FilesystemCommand.invalid("Usage: tee -a <path>");
+    }
+    if (!"-a".equals(tokens[1])) {
+      return FilesystemCommand.invalid("Unsupported tee option: " + tokens[1]);
+    }
+    return FilesystemCommand.option(FilesystemCommand.Type.TEE, "-a", 
tokens[2]);
+  }
+
   private static FilesystemCommand parseCut(String[] tokens) {
     String delimiter = DEFAULT_CUT_DELIMITER;
     String fields = "";
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/FilesystemMutationProvider.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/FilesystemMutationProvider.java
index 325025178ed..3d1d3afbd88 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/FilesystemMutationProvider.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/FilesystemMutationProvider.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cli.fs.provider;
 import org.apache.iotdb.cli.fs.path.FsPath;
 
 import java.sql.SQLException;
+import java.util.List;
 
 public interface FilesystemMutationProvider {
 
@@ -30,4 +31,6 @@ public interface FilesystemMutationProvider {
   void remove(FsPath path) throws SQLException;
 
   void move(FsPath source, FsPath target) throws SQLException;
+
+  void append(FsPath path, List<String> lines) throws SQLException;
 }
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableCsvAppendPlanner.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableCsvAppendPlanner.java
new file mode 100644
index 00000000000..f5a0150c8e0
--- /dev/null
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableCsvAppendPlanner.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cli.fs.provider;
+
+import org.apache.iotdb.cli.fs.sql.SqlRow;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+class TableCsvAppendPlanner {
+
+  private static final int INSERT_BATCH_SIZE = 1000;
+  private static final String INVALID_WRITE_OPERATION =
+      "Invalid filesystem write operation for this path";
+  private static final String NULL_MARKER = "\\N";
+  private static final CSVFormat CSV_FORMAT =
+      CSVFormat.DEFAULT.builder().setIgnoreEmptyLines(true).build();
+
+  private TableCsvAppendPlanner() {}
+
+  static List<String> plan(String tablePath, List<SqlRow> schemaRows, 
List<String> lines)
+      throws SQLException {
+    List<CSVRecord> records = parse(lines);
+    if (records.isEmpty()) {
+      return new ArrayList<>();
+    }
+    List<TableColumn> schemaColumns = schemaColumns(schemaRows);
+    if (schemaColumns.isEmpty()) {
+      throw invalidOperation();
+    }
+    ParsedCsv csv = parseCsv(schemaColumns, records);
+    if (csv.rows.isEmpty()) {
+      return new ArrayList<>();
+    }
+    return buildStatements(tablePath, csv.columns, csv.rows);
+  }
+
+  private static List<CSVRecord> parse(List<String> lines) throws SQLException 
{
+    StringBuilder builder = new StringBuilder();
+    for (String line : lines) {
+      if (builder.length() > 0) {
+        builder.append(System.lineSeparator());
+      }
+      builder.append(line);
+    }
+    try {
+      try (CSVParser parser = CSVParser.parse(builder.toString(), CSV_FORMAT)) 
{
+        return parser.getRecords();
+      }
+    } catch (IOException e) {
+      throw new SQLException("Failed to parse CSV input", e);
+    }
+  }
+
+  private static List<TableColumn> schemaColumns(List<SqlRow> rows) {
+    List<TableColumn> columns = new ArrayList<>();
+    for (SqlRow row : rows) {
+      String name = row.get("ColumnName");
+      String type = row.get("DataType");
+      if (name != null && type != null) {
+        columns.add(new TableColumn(name, type));
+      }
+    }
+    return columns;
+  }
+
+  private static ParsedCsv parseCsv(List<TableColumn> schemaColumns, 
List<CSVRecord> records)
+      throws SQLException {
+    Map<String, TableColumn> columnsByName = columnsByName(schemaColumns);
+    CSVRecord first = records.get(0);
+    boolean hasHeader = looksLikeHeader(first, columnsByName);
+    List<TableColumn> selectedColumns;
+    int firstDataIndex;
+    if (hasHeader) {
+      selectedColumns = selectedColumns(first, columnsByName);
+      firstDataIndex = 1;
+    } else {
+      selectedColumns = schemaColumns;
+      firstDataIndex = 0;
+    }
+    if (!containsTime(selectedColumns)) {
+      throw invalidOperation();
+    }
+
+    List<String> rows = new ArrayList<>();
+    for (int i = firstDataIndex; i < records.size(); i++) {
+      rows.add(recordValues(records.get(i), selectedColumns));
+    }
+    return new ParsedCsv(selectedColumns, rows);
+  }
+
+  private static Map<String, TableColumn> columnsByName(List<TableColumn> 
columns) {
+    Map<String, TableColumn> columnsByName = new LinkedHashMap<>();
+    for (TableColumn column : columns) {
+      columnsByName.put(column.name.toLowerCase(Locale.ROOT), column);
+    }
+    return columnsByName;
+  }
+
+  private static boolean looksLikeHeader(CSVRecord record, Map<String, 
TableColumn> columnsByName) {
+    if (record.size() == 0) {
+      return false;
+    }
+    for (String value : record) {
+      if (!columnsByName.containsKey(value.toLowerCase(Locale.ROOT))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static List<TableColumn> selectedColumns(
+      CSVRecord header, Map<String, TableColumn> columnsByName) throws 
SQLException {
+    List<TableColumn> columns = new ArrayList<>();
+    Set<String> selected = new HashSet<>();
+    for (String value : header) {
+      String normalized = value.toLowerCase(Locale.ROOT);
+      TableColumn column = columnsByName.get(normalized);
+      if (column == null || selected.contains(normalized)) {
+        throw invalidOperation();
+      }
+      selected.add(normalized);
+      columns.add(column);
+    }
+    return columns;
+  }
+
+  private static boolean containsTime(List<TableColumn> columns) {
+    for (TableColumn column : columns) {
+      if (column.isTime()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static String recordValues(CSVRecord record, List<TableColumn> 
columns)
+      throws SQLException {
+    if (record.size() != columns.size()) {
+      throw invalidOperation();
+    }
+    StringBuilder builder = new StringBuilder("(");
+    for (int i = 0; i < columns.size(); i++) {
+      if (i > 0) {
+        builder.append(", ");
+      }
+      builder.append(sqlValue(columns.get(i), record.get(i)));
+    }
+    builder.append(')');
+    return builder.toString();
+  }
+
+  private static String sqlValue(TableColumn column, String value) throws 
SQLException {
+    if (column.isTime()) {
+      if (isNull(value)) {
+        throw invalidOperation();
+      }
+      return timestampValue(value);
+    }
+    if (NULL_MARKER.equals(value)) {
+      return "NULL";
+    }
+    if (column.isTextual() || column.isDateLike()) {
+      return quote(value);
+    }
+    if (value.isEmpty()) {
+      throw invalidOperation();
+    }
+    return value;
+  }
+
+  private static boolean isNull(String value) {
+    return value == null || value.isEmpty() || NULL_MARKER.equals(value);
+  }
+
+  private static String timestampValue(String value) {
+    if (value.matches("[-+]?\\d+")) {
+      return value;
+    }
+    return quote(value);
+  }
+
+  private static String quote(String value) {
+    return "'" + value.replace("'", "''") + "'";
+  }
+
+  private static List<String> buildStatements(
+      String tablePath, List<TableColumn> columns, List<String> rows) {
+    List<String> statements = new ArrayList<>();
+    for (int start = 0; start < rows.size(); start += INSERT_BATCH_SIZE) {
+      int end = Math.min(start + INSERT_BATCH_SIZE, rows.size());
+      statements.add(
+          "INSERT INTO "
+              + identifierPath(tablePath)
+              + "("
+              + columnList(columns)
+              + ") VALUES "
+              + join(rows.subList(start, end), ", "));
+    }
+    return statements;
+  }
+
+  private static String identifierPath(String tablePath) {
+    String[] parts = tablePath.split("\\.", -1);
+    StringBuilder builder = new StringBuilder();
+    for (String part : parts) {
+      if (builder.length() > 0) {
+        builder.append('.');
+      }
+      builder.append(identifier(part));
+    }
+    return builder.toString();
+  }
+
+  private static String columnList(List<TableColumn> columns) {
+    List<String> names = new ArrayList<>();
+    for (TableColumn column : columns) {
+      names.add(identifier(column.name));
+    }
+    return join(names, ", ");
+  }
+
+  private static String identifier(String value) {
+    if (value.matches("[A-Za-z_][A-Za-z0-9_]*")) {
+      return value;
+    }
+    return "\"" + value.replace("\"", "\"\"") + "\"";
+  }
+
+  private static String join(List<String> values, String delimiter) {
+    StringBuilder builder = new StringBuilder();
+    for (String value : values) {
+      if (builder.length() > 0) {
+        builder.append(delimiter);
+      }
+      builder.append(value);
+    }
+    return builder.toString();
+  }
+
+  private static SQLException invalidOperation() {
+    return new SQLException(INVALID_WRITE_OPERATION);
+  }
+
+  private static class ParsedCsv {
+    private final List<TableColumn> columns;
+    private final List<String> rows;
+
+    private ParsedCsv(List<TableColumn> columns, List<String> rows) {
+      this.columns = columns;
+      this.rows = rows;
+    }
+  }
+
+  private static class TableColumn {
+    private final String name;
+    private final String type;
+
+    private TableColumn(String name, String type) {
+      this.name = name;
+      this.type = type.toUpperCase(Locale.ROOT);
+    }
+
+    private boolean isTime() {
+      return "TIME".equalsIgnoreCase(name);
+    }
+
+    private boolean isTextual() {
+      return type.contains("STRING") || type.contains("TEXT") || 
type.contains("BLOB");
+    }
+
+    private boolean isDateLike() {
+      return type.contains("DATE") || type.contains("TIMESTAMP");
+    }
+  }
+}
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProvider.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProvider.java
index d8220fee7f7..8a0e79ca58a 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProvider.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProvider.java
@@ -64,6 +64,23 @@ public class TableFilesystemMutationProvider implements 
FilesystemMutationProvid
     executor.execute("ALTER TABLE " + toTablePath(source) + " RENAME TO " + 
tableName(target));
   }
 
+  @Override
+  public void append(FsPath path, List<String> lines) throws SQLException {
+    if (!isDataFile(path)) {
+      throw invalidOperation();
+    }
+    if (lines == null || lines.isEmpty()) {
+      return;
+    }
+    String tablePath = toTablePath(path);
+    List<String> statements =
+        TableCsvAppendPlanner.plan(
+            tablePath, executor.query("DESC " + tablePath + " DETAILS"), 
lines);
+    for (String statement : statements) {
+      executor.execute(statement);
+    }
+  }
+
   private static SQLException invalidOperation() {
     return new SQLException(INVALID_WRITE_OPERATION);
   }
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/UnsupportedFilesystemMutationProvider.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/UnsupportedFilesystemMutationProvider.java
index 9f4a08ceb01..2d061c32e4e 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/UnsupportedFilesystemMutationProvider.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/UnsupportedFilesystemMutationProvider.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cli.fs.provider;
 import org.apache.iotdb.cli.fs.path.FsPath;
 
 import java.sql.SQLException;
+import java.util.List;
 
 public class UnsupportedFilesystemMutationProvider implements 
FilesystemMutationProvider {
 
@@ -42,6 +43,11 @@ public class UnsupportedFilesystemMutationProvider 
implements FilesystemMutation
     throw unsupported();
   }
 
+  @Override
+  public void append(FsPath path, List<String> lines) throws SQLException {
+    throw unsupported();
+  }
+
   private static SQLException unsupported() {
     return new SQLException(UNSUPPORTED);
   }
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java
index aa68d220e77..704e10f6600 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java
@@ -153,7 +153,8 @@ public class JlineUtils {
     if ("filesystem".equalsIgnoreCase(accessMode)) {
       return new StringsCompleter(
           "pwd", "ls", "ll", "cd", "stat", "cat", "head", "tail", "wc", 
"grep", "find", "less",
-          "more", "file", "du", "mkdir", "rm", "mv", "paste", "tree", "help", 
"exit", "quit");
+          "more", "file", "du", "mkdir", "rm", "mv", "cut", "paste", "tee", 
"tree", "help", "exit",
+          "quit");
     }
     return new StringsCompleter(SQL_KEYWORDS);
   }
diff --git 
a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/FilesystemShellTest.java
 
b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/FilesystemShellTest.java
index 3911b65f382..5566022a31c 100644
--- 
a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/FilesystemShellTest.java
+++ 
b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/FilesystemShellTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cli.utils.CliContext;
 
 import org.jline.reader.Candidate;
 import org.jline.reader.Completer;
+import org.jline.reader.LineReader;
 import org.jline.reader.ParsedLine;
 import org.jline.reader.impl.DefaultParser;
 import org.junit.Before;
@@ -58,6 +59,7 @@ public class FilesystemShellTest {
 
   @Mock private FilesystemSchemaProvider provider;
   @Mock private FilesystemMutationProvider mutationProvider;
+  @Mock private LineReader lineReader;
 
   private ByteArrayOutputStream out;
   private FilesystemShell shell;
@@ -202,6 +204,58 @@ public class FilesystemShellTest {
         .move(FsPath.absolute("/db1/table1.csv"), 
FsPath.absolute("/db1/table2.csv"));
   }
 
+  @Test
+  public void executeTeeRejectsReadOnlyMode() throws SQLException {
+    assertTrue(shell.execute("tee -a /db1/table1.csv"));
+
+    assertTrue(out.toString().contains("tee: /db1/table1.csv: Read-only file 
system"));
+    verifyZeroInteractions(mutationProvider);
+  }
+
+  @Test
+  public void executeTeeReadsInteractiveBufferUntilWriteQuit() throws 
SQLException {
+    CliContext ctx = shellContext();
+    ctx.setLineReader(lineReader);
+    shell = new FilesystemShell(ctx, provider, mutationProvider, true);
+    when(lineReader.readLine("tee> ", null)).thenReturn("time,key,value", 
"1,spricoder,2.0", ":wq");
+
+    assertTrue(shell.execute("tee -a /db1/table1.csv"));
+
+    verify(mutationProvider)
+        .append(
+            FsPath.absolute("/db1/table1.csv"), 
Arrays.asList("time,key,value", "1,spricoder,2.0"));
+  }
+
+  @Test
+  public void executeTeeQuitWarnsBeforeDiscardingInteractiveBuffer() throws 
SQLException {
+    CliContext ctx = shellContext();
+    ctx.setLineReader(lineReader);
+    shell = new FilesystemShell(ctx, provider, mutationProvider, true);
+    when(lineReader.readLine("tee> ", null)).thenReturn("time,key,value", 
":q", ":q!");
+
+    assertTrue(shell.execute("tee -a /db1/table1.csv"));
+
+    assertTrue(out.toString().contains("tee: use :wq to write or :q! to quit 
without writing"));
+    verifyZeroInteractions(mutationProvider);
+  }
+
+  @Test
+  public void executeTeeNonInteractiveReadsStandardInputUntilEof() throws 
SQLException {
+    CliContext ctx =
+        new CliContext(
+            new 
ByteArrayInputStream("time,key,value\n1,spricoder,2.0\n".getBytes()),
+            new PrintStream(out),
+            System.err,
+            ExitType.EXCEPTION);
+    shell = new FilesystemShell(ctx, provider, mutationProvider, true);
+
+    assertTrue(shell.executeNonInteractive("tee -a /db1/table1.csv"));
+
+    verify(mutationProvider)
+        .append(
+            FsPath.absolute("/db1/table1.csv"), 
Arrays.asList("time,key,value", "1,spricoder,2.0"));
+  }
+
   @Test
   public void executeTreePrintsChildrenUntilDepth() throws SQLException {
     when(provider.list(FsPath.absolute("/")))
diff --git 
a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParserTest.java
 
b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParserTest.java
index 8039335d797..eb97455c369 100644
--- 
a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParserTest.java
+++ 
b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParserTest.java
@@ -194,6 +194,26 @@ public class FilesystemCommandParserTest {
         FilesystemCommand.Type.INVALID, FilesystemCommandParser.parse("cut 
-f2,3").getType());
   }
 
+  @Test
+  public void parseTeeAppendPath() {
+    FilesystemCommand command = FilesystemCommandParser.parse("tee -a 
/db1/table1.csv");
+
+    assertEquals(FilesystemCommand.Type.TEE, command.getType());
+    assertEquals("-a", command.getOption());
+    assertEquals("/db1/table1.csv", command.getPath());
+  }
+
+  @Test
+  public void parseTeeRequiresAppendOptionAndPath() {
+    assertEquals(
+        FilesystemCommand.Type.INVALID,
+        FilesystemCommandParser.parse("tee /db1/table1.csv").getType());
+    assertEquals(FilesystemCommand.Type.INVALID, 
FilesystemCommandParser.parse("tee -a").getType());
+    assertEquals(
+        FilesystemCommand.Type.INVALID,
+        FilesystemCommandParser.parse("tee -p /db1/table1.csv").getType());
+  }
+
   @Test
   public void parseWriteCommands() {
     FilesystemCommand mkdir = FilesystemCommandParser.parse("mkdir /db1");
diff --git 
a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProviderTest.java
 
b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProviderTest.java
index 2403bffffb3..2f23c1d4f1d 100644
--- 
a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProviderTest.java
+++ 
b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProviderTest.java
@@ -28,10 +28,12 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 import java.sql.SQLException;
+import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TableFilesystemMutationProviderTest {
 
@@ -98,6 +100,61 @@ public class TableFilesystemMutationProviderTest {
             provider.move(FsPath.absolute("/db1/table1.csv"), 
FsPath.absolute("/db2/table1.csv")));
   }
 
+  @Test
+  public void appendCsvWithHeaderBuildsMultiRowInsert() throws SQLException {
+    mockTableSchema();
+
+    provider.append(
+        FsPath.absolute("/db1/table1.csv"),
+        Arrays.asList("time,key,value", "1,spricoder,2.0", "2,other,\\N"));
+
+    verify(executor).query("DESC db1.table1 DETAILS");
+    verify(executor)
+        .execute(
+            "INSERT INTO db1.table1(time, key, value) VALUES "
+                + "(1, 'spricoder', 2.0), (2, 'other', NULL)");
+  }
+
+  @Test
+  public void appendCsvWithoutHeaderUsesFullSchemaOrder() throws SQLException {
+    mockTableSchema();
+
+    provider.append(FsPath.absolute("/db1/table1.csv"), 
Arrays.asList("1,spricoder,2.0"));
+
+    verify(executor)
+        .execute("INSERT INTO db1.table1(time, key, value) VALUES (1, 
'spricoder', 2.0)");
+  }
+
+  @Test
+  public void appendCsvWithHeaderAllowsPartialColumns() throws SQLException {
+    mockTableSchema();
+
+    provider.append(FsPath.absolute("/db1/table1.csv"), 
Arrays.asList("time,value", "1,2.0"));
+
+    verify(executor).execute("INSERT INTO db1.table1(time, value) VALUES (1, 
2.0)");
+  }
+
+  @Test
+  public void appendRejectsSidecarAndMissingTime() throws SQLException {
+    assertInvalidOperation(
+        () -> provider.append(FsPath.absolute("/db1/table1.schema"), 
Arrays.asList("time", "1")));
+
+    mockTableSchema();
+    assertInvalidOperation(
+        () -> provider.append(FsPath.absolute("/db1/table1.csv"), 
Arrays.asList("key", "a")));
+  }
+
+  private void mockTableSchema() throws SQLException {
+    when(executor.query("DESC db1.table1 DETAILS"))
+        .thenReturn(
+            org.apache.iotdb.cli.fs.sql.SqlRow.list(
+                org.apache.iotdb.cli.fs.sql.SqlRow.of(
+                    "ColumnName", "time", "DataType", "TIMESTAMP"),
+                org.apache.iotdb.cli.fs.sql.SqlRow.of("ColumnName", "key", 
"DataType", "STRING"),
+                org.apache.iotdb.cli.fs.sql.SqlRow.of(
+                    "ColumnName", "value", "DataType", "DOUBLE")));
+  }
+
   private static void assertInvalidOperation(SqlOperation operation) throws 
SQLException {
     try {
       operation.run();


Reply via email to