dawidwys commented on a change in pull request #11692: URL: https://github.com/apache/flink/pull/11692#discussion_r411153299
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsPartitioning.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.sink.DynamicTableSink; + +import java.util.Map; + +/** + * Enables to write partitioned data in a {@link DynamicTableSink}. + * + * <p>Partitions split the data stored in an external system into smaller portions that are identified + * by one or more string-based partition keys. A single partition is represented as a {@code Map < String, String >} + * which maps each partition key to a partition value. Partition keys and their order are defined by the + * catalog table. + * + * <p>For example, data can be partitioned by region and within a region partitioned by month. The order + * of the partition keys (in the example: first by region then by month) is defined by the catalog table. A + * list of partitions could be: + * <pre> + * List( + * ['region'='europe', 'month'='2020-01'], + * ['region'='europe', 'month'='2020-02'], + * ['region'='asia', 'month'='2020-01'], + * ['region'='asia', 'month'='2020-02'] + * ) + * </pre> + * + * <p>Given the following partitioned table: + * <pre>{@code + * CREATE TABLE t (a INT, b STRING, c DOUBLE, region STRING, month STRING) PARTITION BY (region, month); + * }</pre> + * + * <p>We can insert data into <i>static table partitions</i> using the {@code INSERT INTO ... PARTITION} syntax: + * <pre>{@code + * INSERT INTO t PARTITION (region='europe', month='2020-01') SELECT a, b, c FROM my_view; + * }</pre> + * + * <p>If all partition keys get a value assigned in the {@code PARTITION} clause, the operation is considered + * as an "insertion into a static partition". In the above example, the query result should be written + * into the static partition {@code region='europe', month='2020-01'} which will be passed by the planner + * into {@link #applyStaticPartition(Map)}. + * + * <p>Alternatively, we can insert data into <i>dynamic table partitions</i> using the SQL syntax: + * <pre>{@code + * INSERT INTO t PARTITION (region='europe') SELECT a, b, c, month FROM another_view; + * }</pre> + * + * <p>If only a subset of all partition keys (a prefix part) get a value assigned in the {@code PARTITION} + * clause, the operation is considered as an "insertion into a dynamic partition". In the above example, + * the static partition part is {@code region='europe'} which will be passed by the planner into + * {@link #applyStaticPartition(Map)}. The remaining values for partition keys should be obtained from + * each individual record by the sink during runtime. In the example, the {@code month} field is the dynamic + * partition key. + */ +@PublicEvolving +public interface SupportsPartitioning { + + /** + * Provides the static part of a partition. + * + * <p>A single partition maps each partition key to a partition value. Depending on the user-defined + * statement, the partition might not include all partition keys. + * + * <p>See the documentation of {@link SupportsPartitioning} for more information. + * + * @param partition user-defined (possibly partial) static partition + */ + void applyStaticPartition(Map<String, String> partition); + + /** + * Returns whether data needs to be grouped by partition before it is consumed by the sink. By default, + * this is not required from the runtime and records arrive in arbitrary partition order. + * + * <p>If this method returns true, the sink can expect that all records will be grouped by the partition + * keys before consumed by the sink. In other words: The sink will receive all elements of one + * partition and then all elements of another partition. Elements of different partitions will not Review comment: The benefits I see in the approach I suggested are as follows: * The planner makes the decision which mode is used based on the information provided by the source. In the current interface it's the source that makes that decision. The logic must be reimplemented in every source separately (I know this logic is rather easy, nevertheless) * It's more extendible. It's easier to add new modes in the future. I understand there is a low chance there will be new modes. * It's more aligned with other interfaces which have `applyXXX` methods. In the current approach the `requiresPartitionGrouping` has at least two responsibilities. 1) applies a mode (derived from the input parameter), 2) plus communicates the chosen mode to the planner. Nevertheless I am really not strong on this and I am also ok with the current interface, as it is rather a niche use case that probably applies to the filesystem connector exclusively. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
