twalthr commented on code in PR #24155:
URL: https://github.com/apache/flink/pull/24155#discussion_r1469750627


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableDescriptor.java:
##########
@@ -55,16 +55,19 @@ public class TableDescriptor {
 
     private final @Nullable Schema schema;
     private final Map<String, String> options;
+    private final CatalogTable.TableDistribution tableDistribution;

Review Comment:
   use a static import or bette let's make it a top level class in `catalog` 
package.



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java:
##########
@@ -46,4 +46,7 @@ public interface ParserResource {
 
     @Resources.BaseMessage("WITH DRAIN could only be used after WITH 
SAVEPOINT.")
     Resources.ExInst<ParseException> withDrainOnlyUsedWithSavepoint();
+
+    @Resources.BaseMessage("Bucket count must be a positive integer.")

Review Comment:
   we could also perform this check at later layer such that we can also verify 
the information coming from the catalog or `TableDescriptor` that don't go 
through the SQL parser



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableDescriptor.java:
##########
@@ -334,6 +352,12 @@ public Builder format(
             return this;
         }
 
+        /** Define which columns this table is distributed by. */
+        public Builder distributedBy(CatalogTable.TableDistribution 
tableDistribution) {

Review Comment:
   we should offer convenience methods for `TableDescriptor` (1-level API) 
because `TableDistribution` is 2-level API (for people implementing a catalog):
   ```
   distributedByHash(String... bucketKeys)
   distributedByHash(int number, String... bucketKeys)
   ...
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -510,6 +512,191 @@ public void testMergingCreateTableLike() {
                                         partitionedBy("a", "f0"))));
     }
 
+    @Test
+    public void testMergingCreateTableLikeExcludingDistribution() {
+        Map<String, String> sourceProperties = new HashMap<>();
+        sourceProperties.put("format.type", "json");
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("f0", DataTypes.INT().notNull())
+                                .column("f1", DataTypes.TIMESTAMP(3))
+                                .columnByExpression("f2", "`f0` + 12345")
+                                .watermark("f1", "`f1` - interval '1' second")
+                                .build(),
+                        null,
+                        Optional.of(
+                                CatalogTable.TableDistribution.ofHash(
+                                        Collections.singletonList("f0"), 3)),
+                        Arrays.asList("f0", "f1"),
+                        sourceProperties,
+                        null);
+
+        catalogManager.createTable(
+                catalogTable, ObjectIdentifier.of("builtin", "default", 
"sourceTable"), false);
+
+        final String sql =
+                "create table derivedTable(\n"
+                        + "  a int,\n"
+                        + "  watermark for f1 as `f1` - interval '5' second\n"
+                        + ")\n"
+                        + "DISTRIBUTED BY (a, f0)\n"
+                        + "with (\n"
+                        + "  'connector.type' = 'kafka'"
+                        + ")\n"
+                        + "like sourceTable (\n"
+                        + "   EXCLUDING GENERATED\n"
+                        + "   EXCLUDING DISTRIBUTION\n"
+                        + "   EXCLUDING PARTITIONS\n"
+                        + "   OVERWRITING OPTIONS\n"
+                        + "   OVERWRITING WATERMARKS"
+                        + ")";
+        Operation operation = parseAndConvert(sql);
+
+        assertThat(operation)
+                .is(
+                        new HamcrestCondition<>(
+                                isCreateTableOperation(
+                                        withDistribution(
+                                                Optional.of(
+                                                        new 
CatalogTable.TableDistribution(
+                                                                
CatalogTable.TableDistribution.Kind
+                                                                        
.UNKNOWN,
+                                                                null,
+                                                                
Arrays.asList("a", "f0")))),
+                                        withSchema(
+                                                Schema.newBuilder()
+                                                        .column("f0", 
DataTypes.INT().notNull())
+                                                        .column("f1", 
DataTypes.TIMESTAMP(3))
+                                                        .column("a", 
DataTypes.INT())
+                                                        .watermark(
+                                                                "f1", "`f1` - 
INTERVAL '5' SECOND")
+                                                        .build()),
+                                        withOptions(
+                                                entry("connector.type", 
"kafka"),
+                                                entry("format.type", 
"json")))));
+    }
+
+    @Test
+    public void testCreateTableValidDistribution() {
+        final String sql =
+                "create table derivedTable(\n" + "  a int\n" + ")\n" + 
"DISTRIBUTED BY (a)";
+        Operation operation = parseAndConvert(sql);
+        assertThat(operation)
+                .is(
+                        new HamcrestCondition<>(
+                                isCreateTableOperation(
+                                        withDistribution(
+                                                Optional.of(
+                                                        new 
CatalogTable.TableDistribution(
+                                                                
CatalogTable.TableDistribution.Kind
+                                                                        
.UNKNOWN,
+                                                                null,
+                                                                
Collections.singletonList(
+                                                                        
"a")))))));
+    }
+
+    @Test
+    public void testCreateTableInvalidDistribution() {
+        final String sql =
+                "create table derivedTable(\n" + "  a int\n" + ")\n" + 
"DISTRIBUTED BY (f3)";
+
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Invalid bucket key 'f3'. A bucket key must reference 
a physical column in the schema. Available columns are: [a]");
+    }
+
+    // TODO Discuss this case.

Review Comment:
   remove TODO and commented out code



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java:
##########
@@ -82,7 +85,29 @@ static CatalogTable of(
             List<String> partitionKeys,
             Map<String, String> options,
             @Nullable Long snapshot) {
-        return new DefaultCatalogTable(schema, comment, partitionKeys, 
options, snapshot);
+        return new DefaultCatalogTable(
+                schema, comment, partitionKeys, options, snapshot, 
Optional.empty());
+    }
+
+    /**
+     * Creates an instance of {@link CatalogTable} with a specific snapshot.
+     *
+     * @param schema unresolved schema
+     * @param comment optional comment
+     * @param distribution distribution of the table
+     * @param partitionKeys list of partition keys or an empty list if not 
partitioned
+     * @param options options to configure the connector
+     * @param snapshot table snapshot of the table
+     */
+    static CatalogTable of(

Review Comment:
   let's deprecate the other `of` methods then



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java:
##########
@@ -77,6 +83,7 @@ public SqlCreateTable(
             SqlNodeList columnList,
             List<SqlTableConstraint> tableConstraints,
             SqlNodeList propertyList,
+            SqlDistribution sqlDistribution,

Review Comment:
   nit: call this variable `distribution` other variables have also no `Sql` 
prefix. also update other classes.



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDistribution.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Distribution statement in CREATE TABLE DDL, e.g. {@code DISTRIBUTED BY 
HASH(column1, column2)
+ * INTO BUCKETS 10}.
+ */
+public class SqlDistribution extends SqlCall {
+
+    private static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("DISTRIBUTED BY", SqlKind.OTHER);
+
+    private final String distributionKind;
+    private final SqlNodeList bucketColumns;
+    private final SqlNumericLiteral bucketCount;
+
+    public SqlDistribution(
+            SqlParserPos pos,
+            @Nullable String distributionKind,
+            @Nullable SqlNodeList bucketColumns,
+            @Nullable SqlNumericLiteral bucketCount) {
+        super(pos);
+        this.distributionKind = distributionKind;
+        this.bucketColumns = bucketColumns;
+        this.bucketCount = bucketCount;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(bucketCount, bucketColumns);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.newlineAndIndent();
+
+        if (bucketColumns.size() == 0 && bucketCount != null) {
+            writer.keyword("DISTRIBUTED INTO");
+            bucketCount.unparse(writer, leftPrec, rightPrec);
+            writer.keyword("BUCKETS");
+            writer.newlineAndIndent();
+            return;
+        }
+
+        writer.keyword("DISTRIBUTED BY");
+        if (distributionKind != null) {
+            writer.print(distributionKind);
+        }
+        SqlWriter.Frame bucketFrame = writer.startList("(", ")");
+        bucketColumns.unparse(writer, leftPrec, rightPrec);
+        writer.endList(bucketFrame);
+
+        if (bucketCount != null) {
+            writer.keyword("INTO");
+            bucketCount.unparse(writer, leftPrec, rightPrec);
+            writer.keyword("BUCKETS");
+        }
+        writer.newlineAndIndent();
+    }
+
+    public String getDistributionKind() {
+        if (distributionKind == null) {
+            return "UNKNOWN";

Review Comment:
   let's return `Optional<String>` here instead and let other layers deal with 
mapping it to the enum



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1331,6 +1331,25 @@ public ResolvedCatalogTable 
resolveCatalogTable(CatalogTable table) {
                         .filter(Column::isPhysical)
                         .map(Column::getName)
                         .collect(Collectors.toList());
+
+        table.getDistribution()
+                .ifPresent(
+                        distribution ->
+                                distribution
+                                        .getBucketKeys()
+                                        .forEach(
+                                                bucketKey -> {
+                                                    if 
(!physicalColumns.contains(bucketKey)) {
+                                                        throw new 
ValidationException(
+                                                                String.format(
+                                                                        
"Invalid bucket key '%s'. A bucket key must "

Review Comment:
   slightly rephrase: `A bucket key for distribution must...` so that people 
can connect bucket key to DISTRIBUTED BY.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableDescriptor.java:
##########
@@ -101,6 +104,14 @@ public Map<String, String> getOptions() {
         return options;
     }
 
+    public CatalogTable.TableDistribution getTableDistribution() {
+        return tableDistribution;
+    }
+
+    public List<String> getBucketKeys() {

Review Comment:
   remove this esp. with `partitionKeys` ;-) 



##########
flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd:
##########
@@ -147,11 +149,14 @@
     "COMPILE"
     "COLUMNS"
     "DATABASES"
+    "DISTRIBUTED"
+    "DISTRIBUTION"
     "DRAIN"
     "ENFORCED"
     "ESTIMATED_COST"
     "EXTENDED"
     "FUNCTIONS"
+    "HASH"

Review Comment:
   HASH doesn't need to be a reserved keyword, it could be a identifier 
(similar to functions like `HASH()`). A later layer could check for the only 
two supported algorithms.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java:
##########
@@ -144,4 +169,108 @@ default Map<String, String> toProperties() {
     default Optional<Long> getSnapshot() {
         return Optional.empty();
     }
+
+    /** Returns the distribution of the table if the {@code DISTRIBUTED} 
clause is defined. */
+    default Optional<TableDistribution> getDistribution() {
+        return Optional.empty();
+    }
+
+    /** Distribution specification. */
+    @PublicEvolving
+    class TableDistribution {
+
+        private final Kind kind;
+        private final @Nullable Integer bucketCount;
+        private final List<String> bucketKeys;
+
+        @PublicEvolving
+        public TableDistribution(
+                Kind kind, @Nullable Integer bucketCount, List<String> 
bucketKeys) {
+            this.kind = kind;
+            this.bucketCount = bucketCount;
+            this.bucketKeys = bucketKeys;
+        }
+
+        /** Connector-dependent distribution with a declared number of 
buckets. */
+        public static TableDistribution ofUnknown(int bucketCount) {
+            return new TableDistribution(Kind.UNKNOWN, bucketCount, 
Collections.emptyList());
+        }
+
+        /** Hash distribution over on the given keys among the declared number 
of buckets. */
+        public static TableDistribution ofHash(
+                List<String> bucketKeys, @Nullable Integer bucketCount) {
+            return new TableDistribution(Kind.HASH, bucketCount, bucketKeys);
+        }
+
+        /** Range distribution over on the given keys among the declared 
number of buckets. */
+        public static TableDistribution ofRange(
+                List<String> bucketKeys, @Nullable Integer bucketCount) {
+            return new TableDistribution(Kind.RANGE, bucketCount, bucketKeys);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TableDistribution that = (TableDistribution) o;
+            return kind == that.kind
+                    && Objects.equals(bucketCount, that.bucketCount)
+                    && Objects.equals(bucketKeys, that.bucketKeys);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(kind, bucketCount, bucketKeys);
+        }
+
+        @PublicEvolving
+        public enum Kind {
+            UNKNOWN,
+            HASH,
+            RANGE
+        }
+
+        public Kind getKind() {
+            return kind;
+        }
+
+        public List<String> getBucketKeys() {
+            return bucketKeys;
+        }
+
+        public Optional<Integer> getBucketCount() {
+            return Optional.ofNullable(bucketCount);
+        }
+
+        public String toSqlString() {

Review Comment:
   for consistency with `Operation` and `LogicalType`, we should call this 
method `asSerializableString`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java:
##########
@@ -88,19 +93,26 @@ public List<String> getPartitionKeys() {
         return partitionKeys;
     }
 
+    @Override
+    public Optional<TableDistribution> getDistribution() {
+        return tableDistribution;

Review Comment:
   nit: call the field just `distribution`



##########
flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd:
##########
@@ -520,6 +526,7 @@
   # Please keep the keyword in alphabetical order if new keyword is added.
   nonReservedKeywordsToAdd: [
     # not in core, added in Flink
+    "BUCKETS"

Review Comment:
   why is only BUCKETS added here?



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -900,16 +900,168 @@ void testCreateTable() {
         sql(sql).ok(expected);
     }
 
+    String buildDistributionInput(final String distributionClause) {
+        return "CREATE TABLE tbl1 (\n"
+                + "  a bigint,\n"
+                + "  h varchar, \n"
+                + "  g as 2 * (a + 1), \n"
+                + "  ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n"
+                + "  b varchar,\n"
+                + "  proc as PROCTIME(), \n"
+                + "  meta STRING METADATA, \n"
+                + "  my_meta STRING METADATA FROM 'meta', \n"
+                + "  my_meta STRING METADATA FROM 'meta' VIRTUAL, \n"

Review Comment:
   can we simplify the columns? we don't need to test all columns types once 
again.
   3 physical columns (2 for a composite distribution key, 1 for payload) 
should do the job.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java:
##########
@@ -82,7 +85,29 @@ static CatalogTable of(
             List<String> partitionKeys,
             Map<String, String> options,
             @Nullable Long snapshot) {
-        return new DefaultCatalogTable(schema, comment, partitionKeys, 
options, snapshot);
+        return new DefaultCatalogTable(
+                schema, comment, partitionKeys, options, snapshot, 
Optional.empty());
+    }
+
+    /**
+     * Creates an instance of {@link CatalogTable} with a specific snapshot.
+     *
+     * @param schema unresolved schema
+     * @param comment optional comment
+     * @param distribution distribution of the table
+     * @param partitionKeys list of partition keys or an empty list if not 
partitioned
+     * @param options options to configure the connector
+     * @param snapshot table snapshot of the table
+     */
+    static CatalogTable of(

Review Comment:
   As written in the FLIP:
   ```
   CatalogTable.Builder and CatalogTable.newBuilder() instead of another 
overloaded CatalogTable.of() method.
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsBucketing.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogTable.TableDistribution;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+import java.util.Set;
+
+/** Enables to write bucketed data into a {@link DynamicTableSink}. */
+@PublicEvolving
+public interface SupportsBucketing {
+    Set<TableDistribution.Kind> listAlgorithms();

Review Comment:
   ability interfaces need very good JavaDocs, add information what will happen 
with the returned value



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/BucketingSpec.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.sink;
+
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * No properties. This only checks whether the interface is implemented again 
during deserialization
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeName("Bucketing")

Review Comment:
   what is this property good for? other spec don't have it



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1331,6 +1331,25 @@ public ResolvedCatalogTable 
resolveCatalogTable(CatalogTable table) {
                         .filter(Column::isPhysical)
                         .map(Column::getName)
                         .collect(Collectors.toList());
+
+        table.getDistribution()

Review Comment:
   comment above does not match implementation any more



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java:
##########
@@ -37,6 +37,9 @@ public class DefaultCatalogTable implements CatalogTable {
 
     private final Schema schema;
     private final @Nullable String comment;
+

Review Comment:
   nit: remove empty lines around this line



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsBucketing.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogTable.TableDistribution;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+import java.util.Set;
+
+/** Enables to write bucketed data into a {@link DynamicTableSink}. */

Review Comment:
   ability interfaces need very good JavaDocs, explain the concept of bucketing



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java:
##########
@@ -41,6 +41,7 @@
 /** A converter for {@link SqlReplaceTableAs}. */
 public class SqlReplaceTableAsConverter implements 
SqlNodeConverter<SqlReplaceTableAs> {
 
+    // TODO Update this converter if and when DISTRIBUTED BY is supported in 
REPLACE TABLE AS.

Review Comment:
   remove TODO, the code base would be full of TODOs given the thousands of 
tickets we have in Flink



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##########
@@ -434,6 +435,13 @@ private static RowKind parseRowKind(String 
rowKindShortString) {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Option to determine whether to discard 
the late event.");
+
+    private static final ConfigOption<Boolean> SINK_BUCKET_COUNT_REQUIRED =
+            ConfigOptions.key("sink.bucket-count-required")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Option to determine whether or not to require the 
distribution bucket count");

Review Comment:
   nit: add empty line



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java:
##########
@@ -1030,6 +1037,38 @@ private static DataType fixSinkDataType(
                 TypeTransformations.toNullable());
     }
 
+    private static void validateBucketing(
+            String tableDebugName,
+            DynamicTableSink sink,
+            CatalogTable.TableDistribution tableDistribution) {
+        if (!(sink instanceof SupportsBucketing)) {
+            throw new TableException(
+                    String.format(
+                            "Table '%s' is a bucketed table, but the 
underlying %s doesn't "

Review Comment:
   `Table '%s' is distributed into buckets,` here and below to help users 
understand that it's about the `DISTRIBUTED BY` clause



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala:
##########
@@ -1114,6 +1118,61 @@ class CatalogTableITCase(isStreamingMode: Boolean) 
extends TableITCaseBase {
     assertEquals(expectedDDL, row.getField(0).toString)
   }
 
+  @TestTemplate
+  def testCreateTableAndShowCreateTableWithDistributionAlgorithm(): Unit = {
+    val executedDDL =
+      """
+        |create temporary table TBL1 (
+        |  a bigint not null,

Review Comment:
   simplify test columns as commented above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to