This is an automated email from the ASF dual-hosted git repository. SpriCoder pushed a commit to branch fs/inner-view in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 068425994459ac28c90fca4a7c0cc9be2e51722c Author: spricoder <[email protected]> AuthorDate: Thu Apr 30 18:31:33 2026 +0800 add tee --- .../plans/2026-04-29-cli-filesystem-mode.md | 39 +++ .../specs/2026-04-29-cli-filesystem-mode-design.md | 102 +++++++ .../src/main/java/org/apache/iotdb/cli/Cli.java | 2 +- .../org/apache/iotdb/cli/fs/FilesystemShell.java | 84 +++++- .../iotdb/cli/fs/command/FilesystemCommand.java | 1 + .../cli/fs/command/FilesystemCommandParser.java | 13 + .../fs/provider/FilesystemMutationProvider.java | 3 + .../cli/fs/provider/TableCsvAppendPlanner.java | 304 +++++++++++++++++++++ .../provider/TableFilesystemMutationProvider.java | 17 ++ .../UnsupportedFilesystemMutationProvider.java | 6 + .../org/apache/iotdb/cli/utils/JlineUtils.java | 3 +- .../apache/iotdb/cli/fs/FilesystemShellTest.java | 54 ++++ .../fs/command/FilesystemCommandParserTest.java | 20 ++ .../TableFilesystemMutationProviderTest.java | 57 ++++ 14 files changed, 701 insertions(+), 4 deletions(-) diff --git a/docs/superpowers/plans/2026-04-29-cli-filesystem-mode.md b/docs/superpowers/plans/2026-04-29-cli-filesystem-mode.md index 081a5ce8748..0a85d3525b5 100644 --- a/docs/superpowers/plans/2026-04-29-cli-filesystem-mode.md +++ b/docs/superpowers/plans/2026-04-29-cli-filesystem-mode.md @@ -102,6 +102,7 @@ These notes capture follow-up implementation experience for quickly resuming thi - `mkdir /db` creates a database. - `rm /db/table.csv` drops a table. - `mv /db/t1.csv /db/t2.csv` renames a table inside the same database. + - `tee -a /db/table.csv` appends CSV records as table rows. - Forbid `rm` or `mv` of `/db/table.schema` and `/db/table.meta`. - Forbid `rm /db`. - Forbid cross-database rename such as `mv /db1/t.csv /db2/t.csv`. @@ -117,6 +118,43 @@ These notes capture follow-up implementation experience for quickly resuming thi the server returned `550`; the unchecked propagation exited the CLI. Keep a regression test for this behavior. +## Data Append Design + +The first table data-write implementation supports only append semantics over CSV table files: + +```bash +tee -a /db/table.csv +``` + +Design decisions already approved: + +- Only append writes are supported. Truncate, overwrite, random writes, row deletion, and operating + system redirection such as `>> /db/table.csv` are out of scope. +- The target must be a table-mode data file `/<database>/<table>.csv`. +- `tee` without `-a`, sidecar files, legacy column paths, and tree-mode paths are rejected. +- Writes require `--access_mode filesystem --sql_dialect table --fs_write_mode enabled`. +- Non-interactive mode reads CSV from stdin until EOF and then submits. +- Interactive mode enters an append buffer. `:wq` validates and submits, `:q!` discards, and `:q` + exits only if the buffer is empty. +- The first version does not support `:w`. +- CSV parsing should use Apache Commons CSV or another proven parser already available to the + module. +- Header and headerless input are both supported. +- Header input may write a subset of columns but must include `time`. +- Headerless input must provide all columns in the current `/db/table.csv` output order. +- Every record must explicitly provide `time`; `time` cannot be empty or `\N`. +- `\N` is the explicit SQL `NULL` marker for non-time columns. Empty fields are not automatically + NULL. +- Client-side validation covers path, write mode, CSV shape, known columns, and required `time`. +- IoTDB performs type validation, timestamp-format validation, permissions checks, and final insert + semantics. +- Validated records map to SQL of the form + `INSERT INTO db.table(col1,col2,...) VALUES (...), (...), ...`. +- Insert statements should be chunked, for example 1000 records per statement. +- The full buffer must pass client validation before any insert is attempted. +- Server-side insert failure is not compensated or retried automatically. Interactive mode keeps the + buffer; non-interactive mode exits with failure. + ## Supported Command Quick Reference | Command | Description | Example | @@ -142,6 +180,7 @@ These notes capture follow-up implementation experience for quickly resuming thi | `mkdir <path>` | Write-gated; in table mode with writes enabled, creates a database. | `mkdir /newdb` | | `rm <path>` | Write-gated; in table mode with writes enabled, only table CSV drop is allowed. | `rm /db/table.csv` | | `mv <source> <target>` | Write-gated; in table mode with writes enabled, only same-database table CSV rename is allowed. | `mv /db/t1.csv /db/t2.csv` | +| `tee -a <path>` | Write-gated; in table mode with writes enabled, appends CSV input to a table data file. | `tee -a /db/table.csv` | | `help` | Print filesystem-mode help. | `help` | | `exit` / `quit` | Exit filesystem mode. | `exit` | diff --git a/docs/superpowers/specs/2026-04-29-cli-filesystem-mode-design.md b/docs/superpowers/specs/2026-04-29-cli-filesystem-mode-design.md index 2de2d8a71c6..baeee519fd2 100644 --- a/docs/superpowers/specs/2026-04-29-cli-filesystem-mode-design.md +++ b/docs/superpowers/specs/2026-04-29-cli-filesystem-mode-design.md @@ -219,6 +219,7 @@ semantics wherever the same command exists. Provider support can still vary by d | `mkdir <path>` | Write-gated command. With table mode and `--fs_write_mode enabled`, `mkdir /db` creates a database. Otherwise it returns a read-only or unsupported error. | `mkdir /newdb` | | `rm <path>` | Write-gated command. With table mode and `--fs_write_mode enabled`, only `rm /db/table.csv` is allowed and maps to table drop. | `rm /db/table.csv` | | `mv <source> <target>` | Write-gated command. With table mode and `--fs_write_mode enabled`, only same-database table CSV rename is allowed. | `mv /db/t1.csv /db/t2.csv` | +| `tee -a <path>` | Write-gated append command. With table mode and `--fs_write_mode enabled`, only `tee -a /db/table.csv` appends CSV records as rows. | `tee -a /db/table.csv` | | `help` | Print filesystem-mode help. | `help` | | `exit` / `quit` | Exit filesystem mode. | `exit` | @@ -251,6 +252,7 @@ Raw SQL should be run in the default SQL access mode. | `stat /db/table.meta` | `SHOW TABLES FROM db`, filtered to the table and rendered as filesystem metadata for the metadata sidecar. | | `cat /db/table.csv` | `SELECT * FROM db.table LIMIT <limit>`, formatted as CSV records. | | `cut -d, -f2,3 /db/table.csv` | Delimiter-based text field projection over the CSV records. | +| `tee -a /db/table.csv` | Parse CSV input with Apache Commons CSV, validate columns and required `time`, then execute chunked `INSERT INTO db.table(...) VALUES ...`. | | `cat /db/table.schema` | `DESC db.table DETAILS`, formatted as CSV with IoTDB result columns preserved. | | `cat /db/table.meta` | `SHOW TABLES DETAILS FROM db`, filtered to the table and formatted as CSV with IoTDB result columns preserved. | | `stat /db/table/col` | Legacy compatibility: `DESC db.table DETAILS`, filtered to the column. | @@ -321,6 +323,102 @@ example, if `cat time` is resolved from `/testtest` to `/testtest/time`, table m table path and may receive a server error such as `550: Table 'testtest.time' does not exist`. That error should be printed as `cat: 550: ...`, then the prompt should continue. +## Append Data Write Semantics + +Table data writes use Unix append semantics over the table CSV file: + +```bash +tee -a /db/table.csv +``` + +Only append writes are in scope for the first data-write implementation. Truncation, overwrite, +random writes, partial row deletion, and shell redirection such as `>> /db/table.csv` are out of +scope because filesystem mode is an IoTDB CLI command surface, not an operating-system mount. + +Append writes are allowed only when all of these conditions hold: + +- CLI is running with `--access_mode filesystem`. +- CLI is running with `--sql_dialect table`. +- CLI is running with `--fs_write_mode enabled`. +- Target path is exactly a table data file of the form `/<database>/<table>.csv`. +- Command is `tee -a`; `tee` without `-a` is rejected. + +The sidecar files remain read-only metadata views: + +- `tee -a /db/table.schema` is rejected. +- `tee -a /db/table.meta` is rejected. +- Legacy column paths such as `/db/table/col` are not writable append targets. +- Tree-mode paths remain non-writable even when `--fs_write_mode enabled` is set. + +### Input Modes + +Non-interactive mode reads CSV from standard input until EOF, then submits once: + +```bash +printf 'time,key,value\n1,spricoder,2.0\n' | \ + start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root \ + --sql_dialect table --access_mode filesystem --fs_write_mode enabled \ + -e 'tee -a /db/table.csv' +``` + +Interactive mode enters an append buffer: + +```text +IoTDB:fs> tee -a /db/table.csv +time,key,value +1,spricoder,2.0 +2,spricoder,1.5 +:wq +``` + +The append buffer supports a minimal Vim-style command set: + +- `:wq`: validate, submit, and exit the append buffer after a successful write. +- `:q!`: discard the buffer and exit. +- `:q`: exit only when the buffer is empty; otherwise print guidance to use `:wq` or `:q!`. + +The first version intentionally does not support `:w`, because writing while keeping the buffer +open creates ambiguity around repeated submissions and buffer clearing. + +### CSV Input Rules + +Append input is CSV and is parsed with a proven CSV parser, not by ad hoc string splitting. + +- Both header and headerless input are supported. +- Header input may provide a subset of columns, but it must include `time`. +- Headerless input must provide all columns in the current `/db/table.csv` output order. +- Every appended record must explicitly provide `time`. +- `time` must not be empty and must not be `\N`. +- `\N` represents SQL `NULL` for non-time columns. +- Empty fields are not automatically treated as `NULL`. +- Client-side validation checks path, write mode, CSV shape, known columns, and required `time`. +- IoTDB remains responsible for type validation, timestamp-format validation, permissions, and + final insert semantics. + +### SQL Mapping And Failure Semantics + +The provider should map validated append input to table-model SQL: + +```sql +INSERT INTO db.table(col1,col2,...) VALUES (...), (...), ... +``` + +Implementation rules: + +- Use `DESC db.table DETAILS` or an equivalent schema query to get the table columns and output + order. +- Use SQL identifier rendering and SQL literal escaping helpers; never concatenate raw user values + into SQL. +- Convert `\N` to `NULL`. +- Submit rows in fixed-size chunks, for example 1000 records per `INSERT`, to avoid oversized SQL + statements. +- Client-side validation must pass for the full buffer before any write is attempted. +- If a server-side insert fails, the CLI does not compensate or retry automatically. Server state is + whatever IoTDB actually committed. +- In interactive mode, a server-side failure keeps the append buffer so the user can correct input + or abandon it with `:q!`. +- In non-interactive mode, a failure prints a command-prefixed error and exits with failure. + ## Proposed Code Structure New code should live under `org.apache.iotdb.cli.fs`. @@ -332,6 +430,7 @@ New code should live under `org.apache.iotdb.cli.fs`. - `node/FsNode`, `node/FsNodeType`, `node/FsNodeMetadata`: typed metadata model. - `provider/FilesystemSchemaProvider`: schema and data read provider interface. - `provider/FilesystemMutationProvider`: write-gated mutation provider interface. +- `provider/TableCsvAppendPlanner` or equivalent helper: CSV append validation and SQL planning. - `provider/TreeFilesystemSchemaProvider`: tree-model SQL mapping. - `provider/TableFilesystemSchemaProvider`: table-model SQL mapping. - `provider/TableFilesystemMutationProvider`: opt-in table-model write mapping. @@ -363,6 +462,8 @@ The mutation provider owns the current write-gated operations: - `mkdir(FsPath path)` - `remove(FsPath path)` - `move(FsPath source, FsPath target)` +- `append(FsPath path, List<String> csvLines)` or an equivalent table-mode append method for + `tee -a /db/table.csv` When writes are disabled, `FilesystemShell` rejects `mkdir`, `rm`, and `mv` before calling the mutation provider. When writes are enabled, table mode uses `TableFilesystemMutationProvider`; tree @@ -373,6 +474,7 @@ The current table-model write boundary is intentionally narrow: - `mkdir /db` creates a database directory. - `rm /db/table.csv` drops the table data file and therefore the table. - `mv /db/t1.csv /db/t2.csv` renames a table within the same database. +- `tee -a /db/table.csv` appends CSV records as table rows. The sidecar files are metadata views, not independently writable objects: diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java index ad5571efb41..dc42910214c 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java @@ -221,7 +221,7 @@ public class Cli extends AbstractCli { connection.setQueryTimeout(queryTimeout); properties = connection.getServerProperties(); timestampPrecision = properties.getTimestampPrecision(); - createFilesystemShell(ctx, connection).execute(execute); + createFilesystemShell(ctx, connection).executeNonInteractive(execute); ctx.exit(CODE_OK); } catch (SQLException e) { ctx.getPrinter() diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/FilesystemShell.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/FilesystemShell.java index 3ac9eca9d94..63c543d40c6 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/FilesystemShell.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/FilesystemShell.java @@ -32,9 +32,14 @@ import org.apache.iotdb.cli.utils.CliContext; import org.jline.reader.Candidate; import org.jline.reader.Completer; +import org.jline.reader.EndOfFileException; import org.jline.reader.LineReader; import org.jline.reader.ParsedLine; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -48,8 +53,8 @@ public class FilesystemShell { private static final List<String> COMMANDS = Arrays.asList( "pwd", "ls", "ll", "cd", "stat", "cat", "head", "tail", "wc", "grep", "find", "less", - "more", "file", "du", "mkdir", "rm", "mv", "cut", "paste", "tree", "help", "exit", - "quit"); + "more", "file", "du", "mkdir", "rm", "mv", "cut", "paste", "tree", "help", "exit", "quit", + "tee"); private final CliContext ctx; private final FilesystemSchemaProvider provider; @@ -133,6 +138,9 @@ public class FilesystemShell { case PASTE: printRows(provider.read(resolve(command.getPaths()), DEFAULT_READ_LIMIT)); return true; + case TEE: + append(command.getPath(), false); + return true; case HELP: printHelp(); return true; @@ -151,6 +159,15 @@ public class FilesystemShell { } } + public boolean executeNonInteractive(String input) throws SQLException { + FilesystemCommand command = FilesystemCommandParser.parse(input); + if (command.getType() == FilesystemCommand.Type.TEE) { + append(command.getPath(), true); + return true; + } + return execute(input); + } + public Completer createCompleter() { return new FilesystemCompleter(); } @@ -363,6 +380,68 @@ public class FilesystemShell { mutationProvider.move(source, target); } + private void append(String path, boolean nonInteractive) throws SQLException { + FsPath resolvedPath = resolve(path); + if (!ensureWritable("tee", resolvedPath)) { + return; + } + if (nonInteractive || ctx.getLineReader() == null) { + mutationProvider.append(resolvedPath, readStandardInputLines()); + return; + } + appendInteractive(resolvedPath); + } + + private List<String> readStandardInputLines() throws SQLException { + List<String> lines = new ArrayList<>(); + try { + BufferedReader reader = + new BufferedReader(new InputStreamReader(ctx.getIn(), StandardCharsets.UTF_8)); + String line; + while ((line = reader.readLine()) != null) { + lines.add(line); + } + return lines; + } catch (IOException e) { + throw new SQLException("Failed to read standard input", e); + } + } + + private void appendInteractive(FsPath path) throws SQLException { + List<String> lines = new ArrayList<>(); + while (true) { + String line; + try { + line = ctx.getLineReader().readLine("tee> ", null); + } catch (EndOfFileException e) { + if (!lines.isEmpty()) { + ctx.getPrinter().println("tee: use :wq to write or :q! to quit without writing"); + } + return; + } + if (":wq".equals(line)) { + try { + mutationProvider.append(path, lines); + return; + } catch (SQLException e) { + ctx.getPrinter().println("tee: " + e.getMessage()); + continue; + } + } + if (":q!".equals(line)) { + return; + } + if (":q".equals(line)) { + if (lines.isEmpty()) { + return; + } + ctx.getPrinter().println("tee: use :wq to write or :q! to quit without writing"); + continue; + } + lines.add(line); + } + } + private boolean ensureWritable(String command, FsPath path) { if (writeEnabled) { return true; @@ -405,6 +484,7 @@ public class FilesystemShell { ctx.getPrinter().println("mv <source> <target>"); ctx.getPrinter().println("cut -d<delimiter> -f<fields> <path>"); ctx.getPrinter().println("paste <path>..."); + ctx.getPrinter().println("tee -a <path>"); ctx.getPrinter().println("tree [-L depth] [path]"); ctx.getPrinter().println("exit"); } diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommand.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommand.java index df86856ab80..2f77a787a54 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommand.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommand.java @@ -45,6 +45,7 @@ public class FilesystemCommand { MV, CUT, PASTE, + TEE, TREE, SQL, HELP, diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParser.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParser.java index 62d267153b7..75992cf0fc7 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParser.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParser.java @@ -111,6 +111,9 @@ public class FilesystemCommandParser { if ("paste".equals(command)) { return parsePaste(tokens); } + if ("tee".equals(command)) { + return parseTee(tokens); + } if ("tree".equals(command)) { return parseTree(tokens); } @@ -136,6 +139,16 @@ public class FilesystemCommandParser { return FilesystemCommand.paths(FilesystemCommand.Type.PASTE, paths); } + private static FilesystemCommand parseTee(String[] tokens) { + if (tokens.length != 3) { + return FilesystemCommand.invalid("Usage: tee -a <path>"); + } + if (!"-a".equals(tokens[1])) { + return FilesystemCommand.invalid("Unsupported tee option: " + tokens[1]); + } + return FilesystemCommand.option(FilesystemCommand.Type.TEE, "-a", tokens[2]); + } + private static FilesystemCommand parseCut(String[] tokens) { String delimiter = DEFAULT_CUT_DELIMITER; String fields = ""; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/FilesystemMutationProvider.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/FilesystemMutationProvider.java index 325025178ed..3d1d3afbd88 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/FilesystemMutationProvider.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/FilesystemMutationProvider.java @@ -22,6 +22,7 @@ package org.apache.iotdb.cli.fs.provider; import org.apache.iotdb.cli.fs.path.FsPath; import java.sql.SQLException; +import java.util.List; public interface FilesystemMutationProvider { @@ -30,4 +31,6 @@ public interface FilesystemMutationProvider { void remove(FsPath path) throws SQLException; void move(FsPath source, FsPath target) throws SQLException; + + void append(FsPath path, List<String> lines) throws SQLException; } diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableCsvAppendPlanner.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableCsvAppendPlanner.java new file mode 100644 index 00000000000..f5a0150c8e0 --- /dev/null +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableCsvAppendPlanner.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cli.fs.provider; + +import org.apache.iotdb.cli.fs.sql.SqlRow; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +class TableCsvAppendPlanner { + + private static final int INSERT_BATCH_SIZE = 1000; + private static final String INVALID_WRITE_OPERATION = + "Invalid filesystem write operation for this path"; + private static final String NULL_MARKER = "\\N"; + private static final CSVFormat CSV_FORMAT = + CSVFormat.DEFAULT.builder().setIgnoreEmptyLines(true).build(); + + private TableCsvAppendPlanner() {} + + static List<String> plan(String tablePath, List<SqlRow> schemaRows, List<String> lines) + throws SQLException { + List<CSVRecord> records = parse(lines); + if (records.isEmpty()) { + return new ArrayList<>(); + } + List<TableColumn> schemaColumns = schemaColumns(schemaRows); + if (schemaColumns.isEmpty()) { + throw invalidOperation(); + } + ParsedCsv csv = parseCsv(schemaColumns, records); + if (csv.rows.isEmpty()) { + return new ArrayList<>(); + } + return buildStatements(tablePath, csv.columns, csv.rows); + } + + private static List<CSVRecord> parse(List<String> lines) throws SQLException { + StringBuilder builder = new StringBuilder(); + for (String line : lines) { + if (builder.length() > 0) { + builder.append(System.lineSeparator()); + } + builder.append(line); + } + try { + try (CSVParser parser = CSVParser.parse(builder.toString(), CSV_FORMAT)) { + return parser.getRecords(); + } + } catch (IOException e) { + throw new SQLException("Failed to parse CSV input", e); + } + } + + private static List<TableColumn> schemaColumns(List<SqlRow> rows) { + List<TableColumn> columns = new ArrayList<>(); + for (SqlRow row : rows) { + String name = row.get("ColumnName"); + String type = row.get("DataType"); + if (name != null && type != null) { + columns.add(new TableColumn(name, type)); + } + } + return columns; + } + + private static ParsedCsv parseCsv(List<TableColumn> schemaColumns, List<CSVRecord> records) + throws SQLException { + Map<String, TableColumn> columnsByName = columnsByName(schemaColumns); + CSVRecord first = records.get(0); + boolean hasHeader = looksLikeHeader(first, columnsByName); + List<TableColumn> selectedColumns; + int firstDataIndex; + if (hasHeader) { + selectedColumns = selectedColumns(first, columnsByName); + firstDataIndex = 1; + } else { + selectedColumns = schemaColumns; + firstDataIndex = 0; + } + if (!containsTime(selectedColumns)) { + throw invalidOperation(); + } + + List<String> rows = new ArrayList<>(); + for (int i = firstDataIndex; i < records.size(); i++) { + rows.add(recordValues(records.get(i), selectedColumns)); + } + return new ParsedCsv(selectedColumns, rows); + } + + private static Map<String, TableColumn> columnsByName(List<TableColumn> columns) { + Map<String, TableColumn> columnsByName = new LinkedHashMap<>(); + for (TableColumn column : columns) { + columnsByName.put(column.name.toLowerCase(Locale.ROOT), column); + } + return columnsByName; + } + + private static boolean looksLikeHeader(CSVRecord record, Map<String, TableColumn> columnsByName) { + if (record.size() == 0) { + return false; + } + for (String value : record) { + if (!columnsByName.containsKey(value.toLowerCase(Locale.ROOT))) { + return false; + } + } + return true; + } + + private static List<TableColumn> selectedColumns( + CSVRecord header, Map<String, TableColumn> columnsByName) throws SQLException { + List<TableColumn> columns = new ArrayList<>(); + Set<String> selected = new HashSet<>(); + for (String value : header) { + String normalized = value.toLowerCase(Locale.ROOT); + TableColumn column = columnsByName.get(normalized); + if (column == null || selected.contains(normalized)) { + throw invalidOperation(); + } + selected.add(normalized); + columns.add(column); + } + return columns; + } + + private static boolean containsTime(List<TableColumn> columns) { + for (TableColumn column : columns) { + if (column.isTime()) { + return true; + } + } + return false; + } + + private static String recordValues(CSVRecord record, List<TableColumn> columns) + throws SQLException { + if (record.size() != columns.size()) { + throw invalidOperation(); + } + StringBuilder builder = new StringBuilder("("); + for (int i = 0; i < columns.size(); i++) { + if (i > 0) { + builder.append(", "); + } + builder.append(sqlValue(columns.get(i), record.get(i))); + } + builder.append(')'); + return builder.toString(); + } + + private static String sqlValue(TableColumn column, String value) throws SQLException { + if (column.isTime()) { + if (isNull(value)) { + throw invalidOperation(); + } + return timestampValue(value); + } + if (NULL_MARKER.equals(value)) { + return "NULL"; + } + if (column.isTextual() || column.isDateLike()) { + return quote(value); + } + if (value.isEmpty()) { + throw invalidOperation(); + } + return value; + } + + private static boolean isNull(String value) { + return value == null || value.isEmpty() || NULL_MARKER.equals(value); + } + + private static String timestampValue(String value) { + if (value.matches("[-+]?\\d+")) { + return value; + } + return quote(value); + } + + private static String quote(String value) { + return "'" + value.replace("'", "''") + "'"; + } + + private static List<String> buildStatements( + String tablePath, List<TableColumn> columns, List<String> rows) { + List<String> statements = new ArrayList<>(); + for (int start = 0; start < rows.size(); start += INSERT_BATCH_SIZE) { + int end = Math.min(start + INSERT_BATCH_SIZE, rows.size()); + statements.add( + "INSERT INTO " + + identifierPath(tablePath) + + "(" + + columnList(columns) + + ") VALUES " + + join(rows.subList(start, end), ", ")); + } + return statements; + } + + private static String identifierPath(String tablePath) { + String[] parts = tablePath.split("\\.", -1); + StringBuilder builder = new StringBuilder(); + for (String part : parts) { + if (builder.length() > 0) { + builder.append('.'); + } + builder.append(identifier(part)); + } + return builder.toString(); + } + + private static String columnList(List<TableColumn> columns) { + List<String> names = new ArrayList<>(); + for (TableColumn column : columns) { + names.add(identifier(column.name)); + } + return join(names, ", "); + } + + private static String identifier(String value) { + if (value.matches("[A-Za-z_][A-Za-z0-9_]*")) { + return value; + } + return "\"" + value.replace("\"", "\"\"") + "\""; + } + + private static String join(List<String> values, String delimiter) { + StringBuilder builder = new StringBuilder(); + for (String value : values) { + if (builder.length() > 0) { + builder.append(delimiter); + } + builder.append(value); + } + return builder.toString(); + } + + private static SQLException invalidOperation() { + return new SQLException(INVALID_WRITE_OPERATION); + } + + private static class ParsedCsv { + private final List<TableColumn> columns; + private final List<String> rows; + + private ParsedCsv(List<TableColumn> columns, List<String> rows) { + this.columns = columns; + this.rows = rows; + } + } + + private static class TableColumn { + private final String name; + private final String type; + + private TableColumn(String name, String type) { + this.name = name; + this.type = type.toUpperCase(Locale.ROOT); + } + + private boolean isTime() { + return "TIME".equalsIgnoreCase(name); + } + + private boolean isTextual() { + return type.contains("STRING") || type.contains("TEXT") || type.contains("BLOB"); + } + + private boolean isDateLike() { + return type.contains("DATE") || type.contains("TIMESTAMP"); + } + } +} diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProvider.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProvider.java index d8220fee7f7..8a0e79ca58a 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProvider.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProvider.java @@ -64,6 +64,23 @@ public class TableFilesystemMutationProvider implements FilesystemMutationProvid executor.execute("ALTER TABLE " + toTablePath(source) + " RENAME TO " + tableName(target)); } + @Override + public void append(FsPath path, List<String> lines) throws SQLException { + if (!isDataFile(path)) { + throw invalidOperation(); + } + if (lines == null || lines.isEmpty()) { + return; + } + String tablePath = toTablePath(path); + List<String> statements = + TableCsvAppendPlanner.plan( + tablePath, executor.query("DESC " + tablePath + " DETAILS"), lines); + for (String statement : statements) { + executor.execute(statement); + } + } + private static SQLException invalidOperation() { return new SQLException(INVALID_WRITE_OPERATION); } diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/UnsupportedFilesystemMutationProvider.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/UnsupportedFilesystemMutationProvider.java index 9f4a08ceb01..2d061c32e4e 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/UnsupportedFilesystemMutationProvider.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/fs/provider/UnsupportedFilesystemMutationProvider.java @@ -22,6 +22,7 @@ package org.apache.iotdb.cli.fs.provider; import org.apache.iotdb.cli.fs.path.FsPath; import java.sql.SQLException; +import java.util.List; public class UnsupportedFilesystemMutationProvider implements FilesystemMutationProvider { @@ -42,6 +43,11 @@ public class UnsupportedFilesystemMutationProvider implements FilesystemMutation throw unsupported(); } + @Override + public void append(FsPath path, List<String> lines) throws SQLException { + throw unsupported(); + } + private static SQLException unsupported() { return new SQLException(UNSUPPORTED); } diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java index aa68d220e77..704e10f6600 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java @@ -153,7 +153,8 @@ public class JlineUtils { if ("filesystem".equalsIgnoreCase(accessMode)) { return new StringsCompleter( "pwd", "ls", "ll", "cd", "stat", "cat", "head", "tail", "wc", "grep", "find", "less", - "more", "file", "du", "mkdir", "rm", "mv", "paste", "tree", "help", "exit", "quit"); + "more", "file", "du", "mkdir", "rm", "mv", "cut", "paste", "tee", "tree", "help", "exit", + "quit"); } return new StringsCompleter(SQL_KEYWORDS); } diff --git a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/FilesystemShellTest.java b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/FilesystemShellTest.java index 3911b65f382..5566022a31c 100644 --- a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/FilesystemShellTest.java +++ b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/FilesystemShellTest.java @@ -30,6 +30,7 @@ import org.apache.iotdb.cli.utils.CliContext; import org.jline.reader.Candidate; import org.jline.reader.Completer; +import org.jline.reader.LineReader; import org.jline.reader.ParsedLine; import org.jline.reader.impl.DefaultParser; import org.junit.Before; @@ -58,6 +59,7 @@ public class FilesystemShellTest { @Mock private FilesystemSchemaProvider provider; @Mock private FilesystemMutationProvider mutationProvider; + @Mock private LineReader lineReader; private ByteArrayOutputStream out; private FilesystemShell shell; @@ -202,6 +204,58 @@ public class FilesystemShellTest { .move(FsPath.absolute("/db1/table1.csv"), FsPath.absolute("/db1/table2.csv")); } + @Test + public void executeTeeRejectsReadOnlyMode() throws SQLException { + assertTrue(shell.execute("tee -a /db1/table1.csv")); + + assertTrue(out.toString().contains("tee: /db1/table1.csv: Read-only file system")); + verifyZeroInteractions(mutationProvider); + } + + @Test + public void executeTeeReadsInteractiveBufferUntilWriteQuit() throws SQLException { + CliContext ctx = shellContext(); + ctx.setLineReader(lineReader); + shell = new FilesystemShell(ctx, provider, mutationProvider, true); + when(lineReader.readLine("tee> ", null)).thenReturn("time,key,value", "1,spricoder,2.0", ":wq"); + + assertTrue(shell.execute("tee -a /db1/table1.csv")); + + verify(mutationProvider) + .append( + FsPath.absolute("/db1/table1.csv"), Arrays.asList("time,key,value", "1,spricoder,2.0")); + } + + @Test + public void executeTeeQuitWarnsBeforeDiscardingInteractiveBuffer() throws SQLException { + CliContext ctx = shellContext(); + ctx.setLineReader(lineReader); + shell = new FilesystemShell(ctx, provider, mutationProvider, true); + when(lineReader.readLine("tee> ", null)).thenReturn("time,key,value", ":q", ":q!"); + + assertTrue(shell.execute("tee -a /db1/table1.csv")); + + assertTrue(out.toString().contains("tee: use :wq to write or :q! to quit without writing")); + verifyZeroInteractions(mutationProvider); + } + + @Test + public void executeTeeNonInteractiveReadsStandardInputUntilEof() throws SQLException { + CliContext ctx = + new CliContext( + new ByteArrayInputStream("time,key,value\n1,spricoder,2.0\n".getBytes()), + new PrintStream(out), + System.err, + ExitType.EXCEPTION); + shell = new FilesystemShell(ctx, provider, mutationProvider, true); + + assertTrue(shell.executeNonInteractive("tee -a /db1/table1.csv")); + + verify(mutationProvider) + .append( + FsPath.absolute("/db1/table1.csv"), Arrays.asList("time,key,value", "1,spricoder,2.0")); + } + @Test public void executeTreePrintsChildrenUntilDepth() throws SQLException { when(provider.list(FsPath.absolute("/"))) diff --git a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParserTest.java b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParserTest.java index 8039335d797..eb97455c369 100644 --- a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParserTest.java +++ b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/command/FilesystemCommandParserTest.java @@ -194,6 +194,26 @@ public class FilesystemCommandParserTest { FilesystemCommand.Type.INVALID, FilesystemCommandParser.parse("cut -f2,3").getType()); } + @Test + public void parseTeeAppendPath() { + FilesystemCommand command = FilesystemCommandParser.parse("tee -a /db1/table1.csv"); + + assertEquals(FilesystemCommand.Type.TEE, command.getType()); + assertEquals("-a", command.getOption()); + assertEquals("/db1/table1.csv", command.getPath()); + } + + @Test + public void parseTeeRequiresAppendOptionAndPath() { + assertEquals( + FilesystemCommand.Type.INVALID, + FilesystemCommandParser.parse("tee /db1/table1.csv").getType()); + assertEquals(FilesystemCommand.Type.INVALID, FilesystemCommandParser.parse("tee -a").getType()); + assertEquals( + FilesystemCommand.Type.INVALID, + FilesystemCommandParser.parse("tee -p /db1/table1.csv").getType()); + } + @Test public void parseWriteCommands() { FilesystemCommand mkdir = FilesystemCommandParser.parse("mkdir /db1"); diff --git a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProviderTest.java b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProviderTest.java index 2403bffffb3..2f23c1d4f1d 100644 --- a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProviderTest.java +++ b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/fs/provider/TableFilesystemMutationProviderTest.java @@ -28,10 +28,12 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import java.sql.SQLException; +import java.util.Arrays; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TableFilesystemMutationProviderTest { @@ -98,6 +100,61 @@ public class TableFilesystemMutationProviderTest { provider.move(FsPath.absolute("/db1/table1.csv"), FsPath.absolute("/db2/table1.csv"))); } + @Test + public void appendCsvWithHeaderBuildsMultiRowInsert() throws SQLException { + mockTableSchema(); + + provider.append( + FsPath.absolute("/db1/table1.csv"), + Arrays.asList("time,key,value", "1,spricoder,2.0", "2,other,\\N")); + + verify(executor).query("DESC db1.table1 DETAILS"); + verify(executor) + .execute( + "INSERT INTO db1.table1(time, key, value) VALUES " + + "(1, 'spricoder', 2.0), (2, 'other', NULL)"); + } + + @Test + public void appendCsvWithoutHeaderUsesFullSchemaOrder() throws SQLException { + mockTableSchema(); + + provider.append(FsPath.absolute("/db1/table1.csv"), Arrays.asList("1,spricoder,2.0")); + + verify(executor) + .execute("INSERT INTO db1.table1(time, key, value) VALUES (1, 'spricoder', 2.0)"); + } + + @Test + public void appendCsvWithHeaderAllowsPartialColumns() throws SQLException { + mockTableSchema(); + + provider.append(FsPath.absolute("/db1/table1.csv"), Arrays.asList("time,value", "1,2.0")); + + verify(executor).execute("INSERT INTO db1.table1(time, value) VALUES (1, 2.0)"); + } + + @Test + public void appendRejectsSidecarAndMissingTime() throws SQLException { + assertInvalidOperation( + () -> provider.append(FsPath.absolute("/db1/table1.schema"), Arrays.asList("time", "1"))); + + mockTableSchema(); + assertInvalidOperation( + () -> provider.append(FsPath.absolute("/db1/table1.csv"), Arrays.asList("key", "a"))); + } + + private void mockTableSchema() throws SQLException { + when(executor.query("DESC db1.table1 DETAILS")) + .thenReturn( + org.apache.iotdb.cli.fs.sql.SqlRow.list( + org.apache.iotdb.cli.fs.sql.SqlRow.of( + "ColumnName", "time", "DataType", "TIMESTAMP"), + org.apache.iotdb.cli.fs.sql.SqlRow.of("ColumnName", "key", "DataType", "STRING"), + org.apache.iotdb.cli.fs.sql.SqlRow.of( + "ColumnName", "value", "DataType", "DOUBLE"))); + } + private static void assertInvalidOperation(SqlOperation operation) throws SQLException { try { operation.run();
