This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b3c5afe9 [INLONG-6160][Sort] Support dynamic partition for 
KafkaLoadNode (#6165)
1b3c5afe9 is described below

commit 1b3c5afe9492b0a1e4c9b85259fb2db597390b48
Author: Charles <[email protected]>
AuthorDate: Tue Oct 18 12:59:25 2022 +0800

    [INLONG-6160][Sort] Support dynamic partition for KafkaLoadNode (#6165)
---
 .../sort/protocol/constant/KafkaConstant.java      |  40 ++++++-
 .../sort/protocol/node/load/KafkaLoadNode.java     |  56 ++++++++--
 .../org/apache/inlong/sort/base/Constants.java     |  48 +++++++--
 .../base/format/AbstractDynamicSchemaFormat.java   |  52 ++++++++-
 .../base/format/CanalJsonDynamicSchemaFormat.java  |  23 +++-
 .../format/DebeziumJsonDynamicSchemaFormat.java    |  29 ++++-
 .../sort/base/format/JsonDynamicSchemaFormat.java  |  29 +++--
 .../format/CanalJsonDynamicSchemaFormatTest.java   |  16 +++
 .../DebeziumJsonDynamicSchemaFormatTest.java       |  17 +++
 .../kafka/partitioner/RawDataHashPartitioner.java  | 117 +++++++++++++++++++++
 .../sort/kafka/table/KafkaDynamicTableFactory.java |  50 ++++++++-
 .../inlong/sort/parser/KafkaLoadSqlParseTest.java  |  74 ++++++++++++-
 12 files changed, 512 insertions(+), 39 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
index 7ddd8be03..509040d6b 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
@@ -41,10 +41,48 @@ public class KafkaConstant {
 
     /**
      * upsert-kafka
-     * 
+     *
      * @see <a 
href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/upsert-kafka/";>
      *         Upsert Kafka</a>
      */
     public static final String UPSERT_KAFKA = "upsert-kafka-inlong";
 
+    /**
+     * The Raw Data Hash Partitioner is used to extract partition from raw 
data(bytes array):
+     * It needs a partitionPattern used to parse the pattern and a format 
'canal-jon' or 'debezium-json'
+     * to deserialization the raw data(bytes array)
+     * This partitioner will extract primary key from raw data as the 
partition key used hash if the 'partitionPattern'
+     * equals 'PRIMARY_KEY' else it will parse partition key from raw data.
+     */
+    public static final String RAW_HASH = "raw-hash";
+
+    /**
+     * The parallelism of sink
+     */
+    public static final String SINK_PARALLELISM = "sink.parallelism";
+
+    /**
+     * Ignore the changelog of sink
+     */
+    public static final String SINK_IGNORE_CHANGELOG = "sink.ignore.changelog";
+
+    /**
+     * The multiple format of sink
+     */
+    public static final String SINK_MULTIPLE_FORMAT = "sink.multiple.format";
+
+    /**
+     * The partitioner of sink
+     */
+    public static final String SINK_PARTITIONER = "sink.partitioner";
+
+    /**
+     * The topic pattern of sink
+     */
+    public static final String TOPIC_PATTERN = "topic-pattern";
+
+    /**
+     * The multiple partition pattern of sink
+     */
+    public static final String SINK_MULTIPLE_PARTITION_PATTERN = 
"sink.multiple.partition-pattern";
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 1405c17e4..13b741217 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -50,6 +50,18 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import static org.apache.inlong.sort.protocol.constant.KafkaConstant.CONNECTOR;
+import static org.apache.inlong.sort.protocol.constant.KafkaConstant.KAFKA;
+import static 
org.apache.inlong.sort.protocol.constant.KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS;
+import static org.apache.inlong.sort.protocol.constant.KafkaConstant.RAW_HASH;
+import static 
org.apache.inlong.sort.protocol.constant.KafkaConstant.SINK_IGNORE_CHANGELOG;
+import static 
org.apache.inlong.sort.protocol.constant.KafkaConstant.SINK_MULTIPLE_FORMAT;
+import static 
org.apache.inlong.sort.protocol.constant.KafkaConstant.SINK_MULTIPLE_PARTITION_PATTERN;
+import static 
org.apache.inlong.sort.protocol.constant.KafkaConstant.SINK_PARALLELISM;
+import static 
org.apache.inlong.sort.protocol.constant.KafkaConstant.SINK_PARTITIONER;
+import static org.apache.inlong.sort.protocol.constant.KafkaConstant.TOPIC;
+import static 
org.apache.inlong.sort.protocol.constant.KafkaConstant.TOPIC_PATTERN;
+import static 
org.apache.inlong.sort.protocol.constant.KafkaConstant.UPSERT_KAFKA;
 
 /**
  * Kafka load node using kafka connectors provided by flink
@@ -81,6 +93,12 @@ public class KafkaLoadNode extends LoadNode implements 
InlongMetric, Metadata, S
     @Nullable
     @JsonProperty("sinkMultipleFormat")
     private Format sinkMultipleFormat;
+    @Nullable
+    @JsonProperty("sinkPartitioner")
+    private String sinkPartitioner;
+    @Nullable
+    @JsonProperty("partitionPattern")
+    private String partitionPattern;
 
     public KafkaLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
@@ -95,7 +113,8 @@ public class KafkaLoadNode extends LoadNode implements 
InlongMetric, Metadata, S
             @JsonProperty("properties") Map<String, String> properties,
             @JsonProperty("primaryKey") String primaryKey) {
         this(id, name, fields, fieldRelations, filters, filterStrategy, topic, 
bootstrapServers, format,
-                sinkParallelism, properties, primaryKey, null, null);
+                sinkParallelism, properties, primaryKey, null, null,
+                null, null);
     }
 
     @JsonCreator
@@ -112,7 +131,9 @@ public class KafkaLoadNode extends LoadNode implements 
InlongMetric, Metadata, S
             @JsonProperty("properties") Map<String, String> properties,
             @JsonProperty("primaryKey") String primaryKey,
             @Nullable @JsonProperty("sinkMultipleFormat") Format 
sinkMultipleFormat,
-            @Nullable @JsonProperty("topicPattern") String topicPattern) {
+            @Nullable @JsonProperty("topicPattern") String topicPattern,
+            @Nullable @JsonProperty("sinkPartitioner") String sinkPartitioner,
+            @Nullable @JsonProperty("partitionPattern") String 
partitionPattern) {
         super(id, name, fields, fieldRelations, filters, filterStrategy, 
sinkParallelism, properties);
         this.topic = Preconditions.checkNotNull(topic, "topic is null");
         this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, 
"bootstrapServers is null");
@@ -120,6 +141,13 @@ public class KafkaLoadNode extends LoadNode implements 
InlongMetric, Metadata, S
         this.primaryKey = primaryKey;
         this.sinkMultipleFormat = sinkMultipleFormat;
         this.topicPattern = topicPattern;
+        this.sinkPartitioner = sinkPartitioner;
+        if (RAW_HASH.equals(sinkPartitioner)) {
+            this.partitionPattern = 
Preconditions.checkNotNull(partitionPattern,
+                    "partitionPattern is null when the sinkPartitioner is 
'raw-hash'");
+        } else {
+            this.partitionPattern = partitionPattern;
+        }
     }
 
     @Override
@@ -135,31 +163,37 @@ public class KafkaLoadNode extends LoadNode implements 
InlongMetric, Metadata, S
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
-        options.put("topic", topic);
-        options.put("properties.bootstrap.servers", bootstrapServers);
+        options.put(TOPIC, topic);
+        options.put(PROPERTIES_BOOTSTRAP_SERVERS, bootstrapServers);
         if (getSinkParallelism() != null) {
-            options.put("sink.parallelism", getSinkParallelism().toString());
+            options.put(SINK_PARALLELISM, getSinkParallelism().toString());
         }
         if (format instanceof JsonFormat || format instanceof AvroFormat
                 || format instanceof CsvFormat || format instanceof RawFormat) 
{
             if (StringUtils.isEmpty(this.primaryKey)) {
-                options.put("connector", "kafka-inlong");
-                options.put("sink.ignore.changelog", "true");
+                options.put(CONNECTOR, KAFKA);
+                options.put(SINK_IGNORE_CHANGELOG, "true");
                 options.putAll(format.generateOptions(false));
             } else {
-                options.put("connector", "upsert-kafka-inlong");
+                options.put(CONNECTOR, UPSERT_KAFKA);
                 options.putAll(format.generateOptions(true));
             }
             if (format instanceof RawFormat) {
                 if (sinkMultipleFormat != null) {
-                    options.put("sink.multiple.format", 
sinkMultipleFormat.identifier());
+                    options.put(SINK_MULTIPLE_FORMAT, 
sinkMultipleFormat.identifier());
                 }
                 if (StringUtils.isNotBlank(topicPattern)) {
-                    options.put("topic-pattern", topicPattern);
+                    options.put(TOPIC_PATTERN, topicPattern);
+                }
+                if (StringUtils.isNotBlank(sinkPartitioner)) {
+                    options.put(SINK_PARTITIONER, sinkPartitioner);
+                }
+                if (StringUtils.isNotBlank(partitionPattern)) {
+                    options.put(SINK_MULTIPLE_PARTITION_PATTERN, 
partitionPattern);
                 }
             }
         } else if (format instanceof CanalJsonFormat || format instanceof 
DebeziumJsonFormat) {
-            options.put("connector", "kafka-inlong");
+            options.put(CONNECTOR, KAFKA);
             options.putAll(format.generateOptions(false));
         } else {
             throw new IllegalArgumentException("kafka load Node format is 
IllegalArgument");
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 225985079..b6597303d 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -85,6 +85,26 @@ public final class Constants {
 
     public static final String INLONG_METRIC_STATE_NAME = 
"inlong-metric-states";
 
+    public static final String PK_NAMES = "pkNames";
+
+    public static final String DATA = "data";
+
+    public static final String AFTER = "after";
+
+    public static final String BEFORE = "before";
+
+    public static final String SOURCE = "source";
+
+    /**
+     * It is used for jdbc url filter for avoiding url attack
+     * see also in https://su18.org/post/jdbc-connection-url-attack/
+     */
+    public static final String AUTO_DESERIALIZE = "autoDeserialize";
+
+    public static final String AUTO_DESERIALIZE_TRUE = "autoDeserialize=true";
+
+    public static final String AUTO_DESERIALIZE_FALSE = 
"autoDeserialize=false";
+
     public static final ConfigOption<String> INLONG_METRIC =
             ConfigOptions.key("inlong.metric.labels")
                     .stringType()
@@ -112,14 +132,26 @@ public final class Constants {
                     .withDescription(
                             "The format of multiple sink, it represents the 
real format of the raw binary data");
 
-    /**
-     * It is used for jdbc url filter for avoiding url attack
-     * see also in https://su18.org/post/jdbc-connection-url-attack/
-     */
-    public static final String AUTO_DESERIALIZE = "autoDeserialize";
-
-    public static final String AUTO_DESERIALIZE_TRUE = "autoDeserialize=true";
+    public static final ConfigOption<String> SINK_MULTIPLE_DATABASE_PATTERN =
+            ConfigOptions.key("sink.multiple.database-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The option 
'sink.multiple.database-pattern' "
+                            + "is used extract database name from the raw 
binary data, "
+                            + "this is only used in the multiple sink writing 
scenario.");
 
-    public static final String AUTO_DESERIALIZE_FALSE = 
"autoDeserialize=false";
+    public static final ConfigOption<String> SINK_MULTIPLE_TABLE_PATTERN =
+            ConfigOptions.key("sink.multiple.table-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The option 'sink.multiple.table-pattern' 
"
+                            + "is used extract table name from the raw binary 
data, "
+                            + "this is only used in the multiple sink writing 
scenario.");
 
+    public static final ConfigOption<Boolean> SINK_MULTIPLE_ENABLE =
+            ConfigOptions.key("sink.multiple.enable")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("The option 'sink.multiple.enable' "
+                            + "is used to determine whether to support 
multiple sink writing, default is 'false'.");
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
index 397497133..a33d9b003 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
@@ -40,18 +40,31 @@ public abstract class AbstractDynamicSchemaFormat<T> {
     public static final Pattern PATTERN = 
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
 
     /**
-     * Extract value by key from the raw data
+     * Extract values by key from the raw data
      *
      * @param message The byte array of raw data
      * @param keys The key list that will be used to extract
      * @return The value list maps the keys
      * @throws IOException The exceptions may throws when extract
      */
-    public List<String> extract(byte[] message, String... keys) throws 
IOException {
+    public List<String> extractValues(byte[] message, String... keys) throws 
IOException {
+        if (keys == null || keys.length == 0) {
+            return new ArrayList<>();
+        }
+        return extractValues(deserialize(message), keys);
+    }
+
+    /**
+     * Extract values by key from the raw data
+     *
+     * @param data The raw data
+     * @param keys The key list that will be used to extract
+     * @return The value list maps the keys
+     */
+    public List<String> extractValues(T data, String... keys) {
         if (keys == null || keys.length == 0) {
             return new ArrayList<>();
         }
-        final T data = deserialize(message);
         List<String> values = new ArrayList<>(keys.length);
         for (String key : keys) {
             values.add(extract(data, key));
@@ -68,6 +81,39 @@ public abstract class AbstractDynamicSchemaFormat<T> {
      */
     public abstract String extract(T data, String key);
 
+    /**
+     * Extract primary key names
+     *
+     * @param data The raw data
+     * @return The primary key name list
+     */
+    public abstract List<String> extractPrimaryKeyNames(T data);
+
+    /**
+     * Extract primary key values
+     *
+     * @param message The byte array of raw data
+     * @return The values of primary key
+     * @throws IOException The exception may be thrown when executing
+     */
+    public List<String> extractPrimaryKeyValues(byte[] message) throws 
IOException {
+        return extractPrimaryKeyValues(deserialize(message));
+    }
+
+    /**
+     * Extract primary key values
+     *
+     * @param data The raw data
+     * @return The values of primary key
+     */
+    public List<String> extractPrimaryKeyValues(T data) {
+        List<String> pkNames = extractPrimaryKeyNames(data);
+        if (pkNames == null || pkNames.isEmpty()) {
+            return new ArrayList<>();
+        }
+        return extractValues(data, pkNames.toArray(new String[]{}));
+    }
+
     /**
      * Deserialize from byte array
      *
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
index ffd994f2d..6a6b67d38 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
@@ -19,6 +19,11 @@ package org.apache.inlong.sort.base.format;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
+import java.util.ArrayList;
+import java.util.List;
+import static org.apache.inlong.sort.base.Constants.DATA;
+import static org.apache.inlong.sort.base.Constants.PK_NAMES;
+
 /**
  * Canal json dynamic format
  */
@@ -38,14 +43,26 @@ public class CanalJsonDynamicSchemaFormat extends 
JsonDynamicSchemaFormat {
     }
 
     @Override
-    protected JsonNode getPhysicalData(JsonNode root) {
-        JsonNode physicalData = root.get("data");
+    public JsonNode getPhysicalData(JsonNode root) {
+        JsonNode physicalData = root.get(DATA);
         if (physicalData != null) {
-            return root.get("data").get(0);
+            return root.get(DATA).get(0);
         }
         return null;
     }
 
+    @Override
+    public List<String> extractPrimaryKeyNames(JsonNode data) {
+        JsonNode pkNamesNode = data.get(PK_NAMES);
+        List<String> pkNames = new ArrayList<>();
+        if (pkNamesNode != null && pkNamesNode.isArray()) {
+            for (int i = 0; i < pkNamesNode.size(); i++) {
+                pkNames.add(pkNamesNode.get(i).asText());
+            }
+        }
+        return pkNames;
+    }
+
     /**
      * Get the identifier of this dynamic schema format
      *
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
index c5298245e..11754226d 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
@@ -19,6 +19,13 @@ package org.apache.inlong.sort.base.format;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
+import java.util.ArrayList;
+import java.util.List;
+import static org.apache.inlong.sort.base.Constants.AFTER;
+import static org.apache.inlong.sort.base.Constants.BEFORE;
+import static org.apache.inlong.sort.base.Constants.PK_NAMES;
+import static org.apache.inlong.sort.base.Constants.SOURCE;
+
 /**
  * Debezium json dynamic format
  */
@@ -38,14 +45,30 @@ public class DebeziumJsonDynamicSchemaFormat extends 
JsonDynamicSchemaFormat {
     }
 
     @Override
-    protected JsonNode getPhysicalData(JsonNode root) {
-        JsonNode physicalData = root.get("after");
+    public JsonNode getPhysicalData(JsonNode root) {
+        JsonNode physicalData = root.get(AFTER);
         if (physicalData == null) {
-            physicalData = root.get("before");
+            physicalData = root.get(BEFORE);
         }
         return physicalData;
     }
 
+    @Override
+    public List<String> extractPrimaryKeyNames(JsonNode data) {
+        List<String> pkNames = new ArrayList<>();
+        JsonNode sourceNode = data.get(SOURCE);
+        if (sourceNode == null) {
+            return pkNames;
+        }
+        JsonNode pkNamesNode = sourceNode.get(PK_NAMES);
+        if (pkNamesNode != null && pkNamesNode.isArray()) {
+            for (int i = 0; i < pkNamesNode.size(); i++) {
+                pkNames.add(pkNamesNode.get(i).asText());
+            }
+        }
+        return pkNames;
+    }
+
     /**
      * Get the identifier of this dynamic schema format
      *
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index a238b1369..b8be01e80 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -17,12 +17,15 @@
 
 package org.apache.inlong.sort.base.format;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 
 /**
@@ -43,19 +46,17 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
     private final ObjectMapper objectMapper = new ObjectMapper();
 
     /**
-     * Extract value by key from the raw data
+     * Extract values by keys from the raw data
      *
-     * @param message The byte array of raw data
+     * @param root The raw data
      * @param keys The key list that will be used to extract
      * @return The value list maps the keys
-     * @throws IOException The exceptions may throws when extract
      */
     @Override
-    public List<String> extract(byte[] message, String... keys) throws 
IOException {
+    public List<String> extractValues(JsonNode root, String... keys) {
         if (keys == null || keys.length == 0) {
             return new ArrayList<>();
         }
-        final JsonNode root = deserialize(message);
         JsonNode physicalNode = getPhysicalData(root);
         List<String> values = new ArrayList<>(keys.length);
         if (physicalNode == null) {
@@ -164,5 +165,21 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
      * @param root The json root node
      * @return The physical data node
      */
-    protected abstract JsonNode getPhysicalData(JsonNode root);
+    public abstract JsonNode getPhysicalData(JsonNode root);
+
+    /**
+     * Convert physical data to map
+     *
+     * @param root The json root node
+     * @return The map of physicalData
+     * @throws IOException The exception may be thrown when executing
+     */
+    public Map<String, String> physicalDataToMap(JsonNode root) throws 
IOException {
+        JsonNode physicalData = getPhysicalData(root);
+        if (physicalData == null) {
+            return new HashMap<>();
+        }
+        return objectMapper.convertValue(physicalData, new 
TypeReference<Map<String, String>>() {
+        });
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
index 2d12a4bee..ffa45e8c2 100644
--- 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
+++ 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
@@ -18,8 +18,14 @@
 package org.apache.inlong.sort.base.format;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -80,6 +86,16 @@ public class CanalJsonDynamicSchemaFormatTest extends 
DynamicSchemaFormatBaseTes
         return expectedValues;
     }
 
+    @Test
+    @SuppressWarnings({"unchecked"})
+    public void testExtractPrimaryKey() throws IOException {
+        JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+                .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+        List<String> primaryKeys = 
getDynamicSchemaFormat().extractPrimaryKeyNames(rootNode);
+        List<String> values = getDynamicSchemaFormat().extractValues(rootNode, 
primaryKeys.toArray(new String[]{}));
+        Assert.assertEquals(values, Collections.singletonList("111"));
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
diff --git 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
index 81c0e512e..bbd51dac6 100644
--- 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
+++ 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
@@ -18,8 +18,14 @@
 package org.apache.inlong.sort.base.format;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -43,6 +49,7 @@ public class DebeziumJsonDynamicSchemaFormatTest extends 
DynamicSchemaFormatBase
                 + "    \"weight\": 5.15\n"
                 + "  },\n"
                 + "  \"source\": {\"version\": \"0.9.5.Final\",\n"
+                + "\"pkNames\":[\"id\", \"name\"],"
                 + "\t\"connector\": \"mysql\",\n"
                 + "\t\"name\": \"fullfillment\",\n"
                 + "\t\"server_id\" :1,\n"
@@ -71,6 +78,16 @@ public class DebeziumJsonDynamicSchemaFormatTest extends 
DynamicSchemaFormatBase
         return expectedValues;
     }
 
+    @Test
+    @SuppressWarnings({"unchecked"})
+    public void testExtractPrimaryKey() throws IOException {
+        JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+                .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+        List<String> primaryKeys = 
getDynamicSchemaFormat().extractPrimaryKeyNames(rootNode);
+        List<String> values = getDynamicSchemaFormat().extractValues(rootNode, 
primaryKeys.toArray(new String[]{}));
+        Assert.assertEquals(values, Arrays.asList("111", "scooter"));
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/RawDataHashPartitioner.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/RawDataHashPartitioner.java
new file mode 100644
index 000000000..2f19e0489
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/RawDataHashPartitioner.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.partitioner;
+
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * The Raw Data Hash Partitioner is used to extract partition from raw 
data(bytes array):
+ * It needs a partitionPattern used to parse the pattern and a format {@link 
RawDataHashPartitioner#sinkMultipleFormat}
+ * to deserialization the raw data(bytes array)
+ * This partitioner will extract primary key from raw data as the partition 
key used hash if the 'partitionPattern'
+ * equals 'PRIMARY_KEY' else it will parse partition key from raw data.
+ *
+ * @param <T>
+ */
+public class RawDataHashPartitioner<T> extends FlinkKafkaPartitioner<T> {
+
+    /**
+     * The primary key constant, this partitioner will extract primary key 
from raw data if the 'partitionPattern'
+     * equals 'PRIMARY_KEY'
+     */
+    public static final String PRIMARY_KEY = "PRIMARY_KEY";
+    private static final Logger LOG = 
LoggerFactory.getLogger(RawDataHashPartitioner.class);
+    private static final long serialVersionUID = 1L;
+    /**
+     * The partition pattern used to extract partition
+     */
+    private String partitionPattern;
+
+    /**
+     * The format used to deserialization the raw data(bytes array)
+     */
+    private String sinkMultipleFormat;
+
+    @SuppressWarnings({"rawtypes"})
+    private AbstractDynamicSchemaFormat dynamicSchemaFormat;
+
+    @Override
+    public void open(int parallelInstanceId, int parallelInstances) {
+        super.open(parallelInstanceId, parallelInstances);
+        dynamicSchemaFormat = 
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked"})
+    public int partition(T record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
+        Preconditions.checkArgument(
+                partitions != null && partitions.length > 0,
+                "Partitions of the target topic is empty.");
+        int partition = 0;
+        try {
+            String partitionKey;
+            if (PRIMARY_KEY.equals(partitionPattern)) {
+                List<String> values = 
dynamicSchemaFormat.extractPrimaryKeyValues(value);
+                if (values == null || values.isEmpty()) {
+                    return partition;
+                }
+                partitionKey = StringUtils.join(values, "");
+            } else {
+                partitionKey = dynamicSchemaFormat.parse(value, 
partitionPattern);
+            }
+            partition = partitions[(partitionKey.hashCode() & 
Integer.MAX_VALUE) % partitions.length];
+        } catch (Exception e) {
+            LOG.warn("Extract partition failed", e);
+        }
+        return partition;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return o instanceof RawDataHashPartitioner;
+    }
+
+    @Override
+    public int hashCode() {
+        return RawDataHashPartitioner.class.hashCode();
+    }
+
+    public String getPartitionPattern() {
+        return partitionPattern;
+    }
+
+    public void setPartitionPattern(String partitionPattern) {
+        this.partitionPattern = partitionPattern;
+    }
+
+    public String getSinkMultipleFormat() {
+        return sinkMultipleFormat;
+    }
+
+    public void setSinkMultipleFormat(String sinkMultipleFormat) {
+        this.sinkMultipleFormat = sinkMultipleFormat;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 49ca0a56a..480d55948 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
@@ -53,6 +54,7 @@ import org.apache.flink.types.RowKind;
 import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.kafka.KafkaDynamicSink;
+import org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner;
 
 import javax.annotation.Nullable;
 import java.time.Duration;
@@ -89,11 +91,11 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VAL
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.autoCompleteSchemaRegistrySubject;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -111,6 +113,14 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
 
     public static final String IDENTIFIER = "kafka-inlong";
 
+    public static final String SINK_PARTITIONER_VALUE_RAW_HASH = "raw-hash";
+
+    public static final ConfigOption<String> SINK_MULTIPLE_PARTITION_PATTERN =
+            ConfigOptions.key("sink.multiple.partition-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("option 'sink.multiple.partition-pattern' 
used when the partitioner is raw-hash.");
+
     private static final Set<String> SINK_SEMANTIC_ENUMS =
             new HashSet<>(
                     Arrays.asList(
@@ -206,6 +216,19 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                         throw new ValidationException(
                                 "Currently 'round-robin' partitioner only 
works "
                                         + "when option 'key.fields' is not 
specified.");
+                    } else if 
(SINK_PARTITIONER_VALUE_RAW_HASH.equals(partitioner)
+                            || 
"org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner".equals(partitioner))
 {
+                        boolean invalid = 
!"raw".equals(tableOptions.getOptional(FORMAT).orElse(null))
+                                || 
!tableOptions.getOptional(SINK_MULTIPLE_FORMAT).isPresent()
+                                || 
!tableOptions.getOptional(SINK_MULTIPLE_PARTITION_PATTERN).isPresent()
+                                || 
tableOptions.getOptional(SINK_MULTIPLE_FORMAT).get().isEmpty()
+                                || 
tableOptions.getOptional(SINK_MULTIPLE_PARTITION_PATTERN).get().isEmpty();
+                        if (invalid) {
+                            throw new ValidationException(
+                                    "Currently 'raw-hash' partitioner only 
works "
+                                            + "when option 'format' is 'raw' 
and option 'sink.multiple.format' "
+                                            + "and 
'sink.multiple.partition-pattern' is specified.");
+                        }
                     } else if (partitioner.isEmpty()) {
                         throw new ValidationException(
                                 String.format(
@@ -215,6 +238,30 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 });
     }
 
+    private Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
+            ReadableConfig tableOptions, ClassLoader classLoader) {
+        if (tableOptions.getOptional(SINK_PARTITIONER).isPresent()
+                && 
SINK_PARTITIONER_VALUE_RAW_HASH.equals(tableOptions.getOptional(SINK_PARTITIONER).get()))
 {
+            RawDataHashPartitioner<RowData> rawHashPartitioner = new 
RawDataHashPartitioner<>();
+            
rawHashPartitioner.setSinkMultipleFormat(tableOptions.getOptional(SINK_MULTIPLE_FORMAT).orElse(null));
+            
rawHashPartitioner.setPartitionPattern(tableOptions.getOptional(SINK_MULTIPLE_PARTITION_PATTERN)
+                    .orElse(null));
+            return Optional.of(rawHashPartitioner);
+        }
+        Optional<FlinkKafkaPartitioner<RowData>> partitioner = KafkaOptions
+                .getFlinkKafkaPartitioner(tableOptions, classLoader);
+        if (partitioner.isPresent()) {
+            if (partitioner.get() instanceof RawDataHashPartitioner) {
+                RawDataHashPartitioner<RowData> rawHashPartitioner =
+                        (RawDataHashPartitioner<RowData>) partitioner.get();
+                
rawHashPartitioner.setSinkMultipleFormat(tableOptions.getOptional(SINK_MULTIPLE_FORMAT).orElse(null));
+                
rawHashPartitioner.setPartitionPattern(tableOptions.getOptional(SINK_MULTIPLE_PARTITION_PATTERN)
+                        .orElse(null));
+            }
+        }
+        return partitioner;
+    }
+
     private void validateSinkSemantic(ReadableConfig tableOptions) {
         tableOptions
                 .getOptional(SINK_SEMANTIC)
@@ -263,6 +310,7 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
         options.add(SINK_MULTIPLE_FORMAT);
+        options.add(SINK_MULTIPLE_PARTITION_PATTERN);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
index 783f1f4ec..1c3500563 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
@@ -50,7 +50,8 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * Test for kafka load sql parse
+ * Test for {@link KafkaLoadNode} sql parse
+ * and raw hash partitioner {@link 
org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner}
  */
 public class KafkaLoadSqlParseTest extends AbstractTestBase {
 
@@ -93,7 +94,19 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase {
                         new FieldInfo("raw", new VarBinaryFormatInfo())));
         return new KafkaLoadNode("2", "kafka_output", fields, relations, null, 
null,
                 "topic_output", "localhost:9092", new RawFormat(), null,
-                null, null, new CanalJsonFormat(), "${database}_${table}");
+                null, null, new CanalJsonFormat(), "${database}_${table}",
+                null, null);
+    }
+
+    private KafkaLoadNode buildKafkaLoadNodeWithDynamicPartition(String 
pattern) {
+        List<FieldInfo> fields = Collections.singletonList(new 
FieldInfo("raw", new VarBinaryFormatInfo()));
+        List<FieldRelation> relations = Collections
+                .singletonList(new FieldRelation(new FieldInfo("raw", new 
VarBinaryFormatInfo()),
+                        new FieldInfo("raw", new VarBinaryFormatInfo())));
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, null, 
null,
+                "topic_output", "localhost:9092", new RawFormat(), null,
+                null, null, new CanalJsonFormat(), null,
+                "raw-hash", pattern);
     }
 
     /**
@@ -137,7 +150,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase 
{
         Assert.assertTrue(result.tryExecute());
     }
 
-
     /**
      * Test kafka to kafka with dynamic topic
      *
@@ -165,4 +177,60 @@ public class KafkaLoadSqlParseTest extends 
AbstractTestBase {
         ParseResult result = parser.parse();
         Assert.assertTrue(result.tryExecute());
     }
+
+    /**
+     * Test kafka to kafka with dynamic partition
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testKafkaDynamicPartitionParse() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildKafkaExtractNode();
+        Node outputNode = 
buildKafkaLoadNodeWithDynamicPartition("${database}_${table}");
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+    /**
+     * Test kafka to kafka with dynamic partition based on hash of primary key
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testKafkaDynamicPartitionWithPrimaryKey() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildKafkaExtractNode();
+        Node outputNode = 
buildKafkaLoadNodeWithDynamicPartition("PRIMARY_KEY");
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
 }


Reply via email to