JingsongLi commented on code in PR #531: URL: https://github.com/apache/flink-table-store/pull/531#discussion_r1105228777
########## flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/AbstractActionBase.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.action; + +import org.apache.flink.table.store.file.catalog.Catalog; +import org.apache.flink.table.store.file.catalog.CatalogFactory; +import org.apache.flink.table.store.file.catalog.Identifier; +import org.apache.flink.table.store.fs.Path; +import org.apache.flink.table.store.options.CatalogOptions; +import org.apache.flink.table.store.options.Options; +import org.apache.flink.table.store.table.Table; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Abstract base of {@link Action}. */ +public abstract class AbstractActionBase implements Action { Review Comment: `ActionBase` ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java: ########## @@ -58,12 +65,31 @@ public static Identifier fromString(String fullName) { if (paths.length != 2) { throw new IllegalArgumentException( - String.format("Cannot get split '%s' to get database and table", fullName)); + String.format( + "Cannot get splits from '%s' to get database and table", fullName)); } return new Identifier(paths[0], paths[1]); } + public static Identifier fromPath(Path tablePath) { + return fromPath(tablePath.getPath()); + } + + public static Identifier fromPath(String tablePath) { + String[] paths = tablePath.split("/"); + if (paths.length < 2) { + throw new IllegalArgumentException( + String.format( + "Cannot get splits from '%s' to get database and table", tablePath)); Review Comment: Path '%s' is not a legacy path, please use catalog table path: 'warehouse_path/your_dababase.db/your_table'. ########## docs/content/docs/how-to/writing-tables.md: ########## @@ -229,3 +229,37 @@ For more information of drop-partition, see {{< /tab >}} {{< /tabs >}} + +## Deleting from table + +Currently, Table Store supports deleting records via submitting the 'delete' job through `flink run`. + +{{< tabs "delete-from-table" >}} + +{{< tab "Flink Job" >}} + +Run the following command to submit a 'delete' job for the table. + +```bash +<FLINK_HOME>/bin/flink run \ + -c org.apache.flink.table.store.connector.action.FlinkActions \ + /path/to/flink-table-store-dist-{{< version >}}.jar \ + delete \ + --warehouse <warehouse-path> \ + --database <database-name> \ + --table <table-name> + [--where <filter_spec>] +``` + Review Comment: Add a example to rich `filter_spec`. ########## flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.action; + +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.store.connector.FlinkCatalog; +import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder; +import org.apache.flink.table.store.file.operation.Lock; +import org.apache.flink.table.store.fs.Path; +import org.apache.flink.table.store.table.FileStoreTable; +import org.apache.flink.table.store.table.SupportsWrite; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.store.connector.action.Action.getTablePath; +import static org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE; + +/** Delete from table action for Flink. */ +public class DeleteAction extends AbstractActionBase { + + private static final Logger LOG = LoggerFactory.getLogger(DeleteAction.class); + + private final FlinkCatalog flinkCatalog; + + @Nullable private String filter; + + public DeleteAction(Path tablePath) { + super(tablePath); + flinkCatalog = new FlinkCatalog(catalog, "table-store", DEFAULT_DATABASE); + } + + public DeleteAction withFilter(String filter) { + this.filter = filter; + return this; + } + + public static Optional<Action> create(String[] args) { + LOG.info("Delete job args: {}", String.join(" ", args)); + + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + + if (params.has("help")) { + printHelp(); + return Optional.empty(); + } + + Path tablePath = getTablePath(params); + + if (tablePath == null) { + return Optional.empty(); + } + + DeleteAction action = new DeleteAction(tablePath); + + if (params.has("where")) { + String filter = params.get("where"); + if (filter == null) { + return Optional.empty(); + } + + action.withFilter(filter); + } + + return Optional.of(action); + } + + private static void printHelp() { + System.out.println("Action \"delete\" deletes data from a table."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " delete --warehouse <warehouse-path> --database <database-name> " + + "--table <table-name> [--where <filter_spec>]"); + System.out.println(" delete --path <table-path> [--where <filter_spec>]"); + System.out.println(); + + System.out.println( + "The '--where <filter_spec>' part is equal to the 'WHERE' clause in SQL DELETE statement"); + System.out.println(); + + System.out.println("Examples:"); + System.out.println( + " delete --warehouse hdfs:///path/to/warehouse --database test_db --table test_table"); + System.out.println(" It's equal to 'DELETE FROM test_table' which deletes all records"); + System.out.println(); + System.out.println( + " delete --path hdfs:///path/to/warehouse/test_db.db/test_table --where id > (SELECT count(*) FROM employee)"); + System.out.println( + " It's equal to 'DELETE FROM test_table WHERE id > (SELECT count(*) FROM employee)"); + } + + @Override + public void run() throws Exception { + if (filter == null) { + LOG.debug("Run delete action with no filter."); + ((SupportsWrite) table) + .deleteWhere( + UUID.randomUUID().toString(), + new ArrayList<>(), + Lock.factory(catalog.lockFactory().orElse(null), identifier)); + } else { + LOG.debug("Run delete action with filter '{}'.", filter); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + StreamTableEnvironment tEnv = + StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + + tEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog); + tEnv.useCatalog(flinkCatalog.getName()); + + Table queriedTable = + tEnv.sqlQuery( + String.format( + "SELECT * FROM %s WHERE %s", + identifier.getEscapedFullName('`'), filter)); + + List<DataStructureConverter<Object, Object>> converters = + queriedTable.getResolvedSchema().getColumnDataTypes().stream() + .map(DataStructureConverters::getConverter) + .collect(Collectors.toList()); + + DataStream<RowData> dataStream = + tEnv.toChangelogStream(queriedTable) + .map( + row -> { + BiFunction<Integer, Row, Object> fieldConverter = Review Comment: For performance, do use these in critical path: - Don't use `BiFunction`. - Don't use java stream. Just use for loop. ########## flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.action; + +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.store.connector.FlinkCatalog; +import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder; +import org.apache.flink.table.store.file.operation.Lock; +import org.apache.flink.table.store.fs.Path; +import org.apache.flink.table.store.table.FileStoreTable; +import org.apache.flink.table.store.table.SupportsWrite; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.store.connector.action.Action.getTablePath; +import static org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE; + +/** Delete from table action for Flink. */ +public class DeleteAction extends AbstractActionBase { + + private static final Logger LOG = LoggerFactory.getLogger(DeleteAction.class); + + private final FlinkCatalog flinkCatalog; + + @Nullable private String filter; + + public DeleteAction(Path tablePath) { + super(tablePath); + flinkCatalog = new FlinkCatalog(catalog, "table-store", DEFAULT_DATABASE); + } + + public DeleteAction withFilter(String filter) { + this.filter = filter; + return this; + } + + public static Optional<Action> create(String[] args) { + LOG.info("Delete job args: {}", String.join(" ", args)); + + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + + if (params.has("help")) { + printHelp(); + return Optional.empty(); + } + + Path tablePath = getTablePath(params); + + if (tablePath == null) { + return Optional.empty(); + } + + DeleteAction action = new DeleteAction(tablePath); + + if (params.has("where")) { + String filter = params.get("where"); + if (filter == null) { + return Optional.empty(); + } + + action.withFilter(filter); + } + + return Optional.of(action); + } + + private static void printHelp() { + System.out.println("Action \"delete\" deletes data from a table."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " delete --warehouse <warehouse-path> --database <database-name> " + + "--table <table-name> [--where <filter_spec>]"); + System.out.println(" delete --path <table-path> [--where <filter_spec>]"); + System.out.println(); + + System.out.println( + "The '--where <filter_spec>' part is equal to the 'WHERE' clause in SQL DELETE statement"); + System.out.println(); + + System.out.println("Examples:"); + System.out.println( + " delete --warehouse hdfs:///path/to/warehouse --database test_db --table test_table"); + System.out.println(" It's equal to 'DELETE FROM test_table' which deletes all records"); + System.out.println(); + System.out.println( + " delete --path hdfs:///path/to/warehouse/test_db.db/test_table --where id > (SELECT count(*) FROM employee)"); + System.out.println( + " It's equal to 'DELETE FROM test_table WHERE id > (SELECT count(*) FROM employee)"); + } + + @Override + public void run() throws Exception { + if (filter == null) { + LOG.debug("Run delete action with no filter."); Review Comment: I think we can throw exception here to alert users to use overwrite. `table.deleteWhere` has performance problem if the table is very large. ########## flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.action; + +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.store.connector.FlinkCatalog; +import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder; +import org.apache.flink.table.store.file.operation.Lock; +import org.apache.flink.table.store.fs.Path; +import org.apache.flink.table.store.table.FileStoreTable; +import org.apache.flink.table.store.table.SupportsWrite; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.store.connector.action.Action.getTablePath; +import static org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE; + +/** Delete from table action for Flink. */ +public class DeleteAction extends AbstractActionBase { + + private static final Logger LOG = LoggerFactory.getLogger(DeleteAction.class); + + private final FlinkCatalog flinkCatalog; + + @Nullable private String filter; + + public DeleteAction(Path tablePath) { + super(tablePath); + flinkCatalog = new FlinkCatalog(catalog, "table-store", DEFAULT_DATABASE); + } + + public DeleteAction withFilter(String filter) { + this.filter = filter; + return this; + } + + public static Optional<Action> create(String[] args) { + LOG.info("Delete job args: {}", String.join(" ", args)); + + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + + if (params.has("help")) { + printHelp(); + return Optional.empty(); + } + + Path tablePath = getTablePath(params); + + if (tablePath == null) { + return Optional.empty(); + } + + DeleteAction action = new DeleteAction(tablePath); + + if (params.has("where")) { + String filter = params.get("where"); + if (filter == null) { + return Optional.empty(); + } + + action.withFilter(filter); + } + + return Optional.of(action); + } + + private static void printHelp() { + System.out.println("Action \"delete\" deletes data from a table."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " delete --warehouse <warehouse-path> --database <database-name> " + + "--table <table-name> [--where <filter_spec>]"); + System.out.println(" delete --path <table-path> [--where <filter_spec>]"); + System.out.println(); + + System.out.println( + "The '--where <filter_spec>' part is equal to the 'WHERE' clause in SQL DELETE statement"); + System.out.println(); + + System.out.println("Examples:"); + System.out.println( + " delete --warehouse hdfs:///path/to/warehouse --database test_db --table test_table"); + System.out.println(" It's equal to 'DELETE FROM test_table' which deletes all records"); + System.out.println(); + System.out.println( + " delete --path hdfs:///path/to/warehouse/test_db.db/test_table --where id > (SELECT count(*) FROM employee)"); + System.out.println( + " It's equal to 'DELETE FROM test_table WHERE id > (SELECT count(*) FROM employee)"); + } + + @Override + public void run() throws Exception { + if (filter == null) { + LOG.debug("Run delete action with no filter."); + ((SupportsWrite) table) + .deleteWhere( + UUID.randomUUID().toString(), + new ArrayList<>(), + Lock.factory(catalog.lockFactory().orElse(null), identifier)); + } else { + LOG.debug("Run delete action with filter '{}'.", filter); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); Review Comment: Not here, set in your tests. ########## flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/AbstractActionBase.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.action; + +import org.apache.flink.table.store.file.catalog.Catalog; +import org.apache.flink.table.store.file.catalog.CatalogFactory; +import org.apache.flink.table.store.file.catalog.Identifier; +import org.apache.flink.table.store.fs.Path; +import org.apache.flink.table.store.options.CatalogOptions; +import org.apache.flink.table.store.options.Options; +import org.apache.flink.table.store.table.Table; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Abstract base of {@link Action}. */ +public abstract class AbstractActionBase implements Action { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractActionBase.class); + + protected final Catalog catalog; + + protected final Identifier identifier; + + protected Table table; + + AbstractActionBase(Path tablePath) { Review Comment: We can introduce two constructor: 1. warehouse database tableName 2. tablePath -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
