This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1b3c5afe9 [INLONG-6160][Sort] Support dynamic partition for
KafkaLoadNode (#6165)
1b3c5afe9 is described below
commit 1b3c5afe9492b0a1e4c9b85259fb2db597390b48
Author: Charles <[email protected]>
AuthorDate: Tue Oct 18 12:59:25 2022 +0800
[INLONG-6160][Sort] Support dynamic partition for KafkaLoadNode (#6165)
---
.../sort/protocol/constant/KafkaConstant.java | 40 ++++++-
.../sort/protocol/node/load/KafkaLoadNode.java | 56 ++++++++--
.../org/apache/inlong/sort/base/Constants.java | 48 +++++++--
.../base/format/AbstractDynamicSchemaFormat.java | 52 ++++++++-
.../base/format/CanalJsonDynamicSchemaFormat.java | 23 +++-
.../format/DebeziumJsonDynamicSchemaFormat.java | 29 ++++-
.../sort/base/format/JsonDynamicSchemaFormat.java | 29 +++--
.../format/CanalJsonDynamicSchemaFormatTest.java | 16 +++
.../DebeziumJsonDynamicSchemaFormatTest.java | 17 +++
.../kafka/partitioner/RawDataHashPartitioner.java | 117 +++++++++++++++++++++
.../sort/kafka/table/KafkaDynamicTableFactory.java | 50 ++++++++-
.../inlong/sort/parser/KafkaLoadSqlParseTest.java | 74 ++++++++++++-
12 files changed, 512 insertions(+), 39 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
index 7ddd8be03..509040d6b 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
@@ -41,10 +41,48 @@ public class KafkaConstant {
/**
* upsert-kafka
- *
+ *
* @see <a
href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/upsert-kafka/">
* Upsert Kafka</a>
*/
public static final String UPSERT_KAFKA = "upsert-kafka-inlong";
+ /**
+ * The Raw Data Hash Partitioner is used to extract partition from raw
data(bytes array):
+ * It needs a partitionPattern used to parse the pattern and a format
'canal-jon' or 'debezium-json'
+ * to deserialization the raw data(bytes array)
+ * This partitioner will extract primary key from raw data as the
partition key used hash if the 'partitionPattern'
+ * equals 'PRIMARY_KEY' else it will parse partition key from raw data.
+ */
+ public static final String RAW_HASH = "raw-hash";
+
+ /**
+ * The parallelism of sink
+ */
+ public static final String SINK_PARALLELISM = "sink.parallelism";
+
+ /**
+ * Ignore the changelog of sink
+ */
+ public static final String SINK_IGNORE_CHANGELOG = "sink.ignore.changelog";
+
+ /**
+ * The multiple format of sink
+ */
+ public static final String SINK_MULTIPLE_FORMAT = "sink.multiple.format";
+
+ /**
+ * The partitioner of sink
+ */
+ public static final String SINK_PARTITIONER = "sink.partitioner";
+
+ /**
+ * The topic pattern of sink
+ */
+ public static final String TOPIC_PATTERN = "topic-pattern";
+
+ /**
+ * The multiple partition pattern of sink
+ */
+ public static final String SINK_MULTIPLE_PARTITION_PATTERN =
"sink.multiple.partition-pattern";
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 1405c17e4..13b741217 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -50,6 +50,18 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.inlong.sort.protocol.constant.KafkaConstant.CONNECTOR;
+import static org.apache.inlong.sort.protocol.constant.KafkaConstant.KAFKA;
+import static
org.apache.inlong.sort.protocol.constant.KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS;
+import static org.apache.inlong.sort.protocol.constant.KafkaConstant.RAW_HASH;
+import static
org.apache.inlong.sort.protocol.constant.KafkaConstant.SINK_IGNORE_CHANGELOG;
+import static
org.apache.inlong.sort.protocol.constant.KafkaConstant.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.protocol.constant.KafkaConstant.SINK_MULTIPLE_PARTITION_PATTERN;
+import static
org.apache.inlong.sort.protocol.constant.KafkaConstant.SINK_PARALLELISM;
+import static
org.apache.inlong.sort.protocol.constant.KafkaConstant.SINK_PARTITIONER;
+import static org.apache.inlong.sort.protocol.constant.KafkaConstant.TOPIC;
+import static
org.apache.inlong.sort.protocol.constant.KafkaConstant.TOPIC_PATTERN;
+import static
org.apache.inlong.sort.protocol.constant.KafkaConstant.UPSERT_KAFKA;
/**
* Kafka load node using kafka connectors provided by flink
@@ -81,6 +93,12 @@ public class KafkaLoadNode extends LoadNode implements
InlongMetric, Metadata, S
@Nullable
@JsonProperty("sinkMultipleFormat")
private Format sinkMultipleFormat;
+ @Nullable
+ @JsonProperty("sinkPartitioner")
+ private String sinkPartitioner;
+ @Nullable
+ @JsonProperty("partitionPattern")
+ private String partitionPattern;
public KafkaLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@@ -95,7 +113,8 @@ public class KafkaLoadNode extends LoadNode implements
InlongMetric, Metadata, S
@JsonProperty("properties") Map<String, String> properties,
@JsonProperty("primaryKey") String primaryKey) {
this(id, name, fields, fieldRelations, filters, filterStrategy, topic,
bootstrapServers, format,
- sinkParallelism, properties, primaryKey, null, null);
+ sinkParallelism, properties, primaryKey, null, null,
+ null, null);
}
@JsonCreator
@@ -112,7 +131,9 @@ public class KafkaLoadNode extends LoadNode implements
InlongMetric, Metadata, S
@JsonProperty("properties") Map<String, String> properties,
@JsonProperty("primaryKey") String primaryKey,
@Nullable @JsonProperty("sinkMultipleFormat") Format
sinkMultipleFormat,
- @Nullable @JsonProperty("topicPattern") String topicPattern) {
+ @Nullable @JsonProperty("topicPattern") String topicPattern,
+ @Nullable @JsonProperty("sinkPartitioner") String sinkPartitioner,
+ @Nullable @JsonProperty("partitionPattern") String
partitionPattern) {
super(id, name, fields, fieldRelations, filters, filterStrategy,
sinkParallelism, properties);
this.topic = Preconditions.checkNotNull(topic, "topic is null");
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers,
"bootstrapServers is null");
@@ -120,6 +141,13 @@ public class KafkaLoadNode extends LoadNode implements
InlongMetric, Metadata, S
this.primaryKey = primaryKey;
this.sinkMultipleFormat = sinkMultipleFormat;
this.topicPattern = topicPattern;
+ this.sinkPartitioner = sinkPartitioner;
+ if (RAW_HASH.equals(sinkPartitioner)) {
+ this.partitionPattern =
Preconditions.checkNotNull(partitionPattern,
+ "partitionPattern is null when the sinkPartitioner is
'raw-hash'");
+ } else {
+ this.partitionPattern = partitionPattern;
+ }
}
@Override
@@ -135,31 +163,37 @@ public class KafkaLoadNode extends LoadNode implements
InlongMetric, Metadata, S
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
- options.put("topic", topic);
- options.put("properties.bootstrap.servers", bootstrapServers);
+ options.put(TOPIC, topic);
+ options.put(PROPERTIES_BOOTSTRAP_SERVERS, bootstrapServers);
if (getSinkParallelism() != null) {
- options.put("sink.parallelism", getSinkParallelism().toString());
+ options.put(SINK_PARALLELISM, getSinkParallelism().toString());
}
if (format instanceof JsonFormat || format instanceof AvroFormat
|| format instanceof CsvFormat || format instanceof RawFormat)
{
if (StringUtils.isEmpty(this.primaryKey)) {
- options.put("connector", "kafka-inlong");
- options.put("sink.ignore.changelog", "true");
+ options.put(CONNECTOR, KAFKA);
+ options.put(SINK_IGNORE_CHANGELOG, "true");
options.putAll(format.generateOptions(false));
} else {
- options.put("connector", "upsert-kafka-inlong");
+ options.put(CONNECTOR, UPSERT_KAFKA);
options.putAll(format.generateOptions(true));
}
if (format instanceof RawFormat) {
if (sinkMultipleFormat != null) {
- options.put("sink.multiple.format",
sinkMultipleFormat.identifier());
+ options.put(SINK_MULTIPLE_FORMAT,
sinkMultipleFormat.identifier());
}
if (StringUtils.isNotBlank(topicPattern)) {
- options.put("topic-pattern", topicPattern);
+ options.put(TOPIC_PATTERN, topicPattern);
+ }
+ if (StringUtils.isNotBlank(sinkPartitioner)) {
+ options.put(SINK_PARTITIONER, sinkPartitioner);
+ }
+ if (StringUtils.isNotBlank(partitionPattern)) {
+ options.put(SINK_MULTIPLE_PARTITION_PATTERN,
partitionPattern);
}
}
} else if (format instanceof CanalJsonFormat || format instanceof
DebeziumJsonFormat) {
- options.put("connector", "kafka-inlong");
+ options.put(CONNECTOR, KAFKA);
options.putAll(format.generateOptions(false));
} else {
throw new IllegalArgumentException("kafka load Node format is
IllegalArgument");
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 225985079..b6597303d 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -85,6 +85,26 @@ public final class Constants {
public static final String INLONG_METRIC_STATE_NAME =
"inlong-metric-states";
+ public static final String PK_NAMES = "pkNames";
+
+ public static final String DATA = "data";
+
+ public static final String AFTER = "after";
+
+ public static final String BEFORE = "before";
+
+ public static final String SOURCE = "source";
+
+ /**
+ * It is used for jdbc url filter for avoiding url attack
+ * see also in https://su18.org/post/jdbc-connection-url-attack/
+ */
+ public static final String AUTO_DESERIALIZE = "autoDeserialize";
+
+ public static final String AUTO_DESERIALIZE_TRUE = "autoDeserialize=true";
+
+ public static final String AUTO_DESERIALIZE_FALSE =
"autoDeserialize=false";
+
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric.labels")
.stringType()
@@ -112,14 +132,26 @@ public final class Constants {
.withDescription(
"The format of multiple sink, it represents the
real format of the raw binary data");
- /**
- * It is used for jdbc url filter for avoiding url attack
- * see also in https://su18.org/post/jdbc-connection-url-attack/
- */
- public static final String AUTO_DESERIALIZE = "autoDeserialize";
-
- public static final String AUTO_DESERIALIZE_TRUE = "autoDeserialize=true";
+ public static final ConfigOption<String> SINK_MULTIPLE_DATABASE_PATTERN =
+ ConfigOptions.key("sink.multiple.database-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The option
'sink.multiple.database-pattern' "
+ + "is used extract database name from the raw
binary data, "
+ + "this is only used in the multiple sink writing
scenario.");
- public static final String AUTO_DESERIALIZE_FALSE =
"autoDeserialize=false";
+ public static final ConfigOption<String> SINK_MULTIPLE_TABLE_PATTERN =
+ ConfigOptions.key("sink.multiple.table-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The option 'sink.multiple.table-pattern'
"
+ + "is used extract table name from the raw binary
data, "
+ + "this is only used in the multiple sink writing
scenario.");
+ public static final ConfigOption<Boolean> SINK_MULTIPLE_ENABLE =
+ ConfigOptions.key("sink.multiple.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("The option 'sink.multiple.enable' "
+ + "is used to determine whether to support
multiple sink writing, default is 'false'.");
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
index 397497133..a33d9b003 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
@@ -40,18 +40,31 @@ public abstract class AbstractDynamicSchemaFormat<T> {
public static final Pattern PATTERN =
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
/**
- * Extract value by key from the raw data
+ * Extract values by key from the raw data
*
* @param message The byte array of raw data
* @param keys The key list that will be used to extract
* @return The value list maps the keys
* @throws IOException The exceptions may throws when extract
*/
- public List<String> extract(byte[] message, String... keys) throws
IOException {
+ public List<String> extractValues(byte[] message, String... keys) throws
IOException {
+ if (keys == null || keys.length == 0) {
+ return new ArrayList<>();
+ }
+ return extractValues(deserialize(message), keys);
+ }
+
+ /**
+ * Extract values by key from the raw data
+ *
+ * @param data The raw data
+ * @param keys The key list that will be used to extract
+ * @return The value list maps the keys
+ */
+ public List<String> extractValues(T data, String... keys) {
if (keys == null || keys.length == 0) {
return new ArrayList<>();
}
- final T data = deserialize(message);
List<String> values = new ArrayList<>(keys.length);
for (String key : keys) {
values.add(extract(data, key));
@@ -68,6 +81,39 @@ public abstract class AbstractDynamicSchemaFormat<T> {
*/
public abstract String extract(T data, String key);
+ /**
+ * Extract primary key names
+ *
+ * @param data The raw data
+ * @return The primary key name list
+ */
+ public abstract List<String> extractPrimaryKeyNames(T data);
+
+ /**
+ * Extract primary key values
+ *
+ * @param message The byte array of raw data
+ * @return The values of primary key
+ * @throws IOException The exception may be thrown when executing
+ */
+ public List<String> extractPrimaryKeyValues(byte[] message) throws
IOException {
+ return extractPrimaryKeyValues(deserialize(message));
+ }
+
+ /**
+ * Extract primary key values
+ *
+ * @param data The raw data
+ * @return The values of primary key
+ */
+ public List<String> extractPrimaryKeyValues(T data) {
+ List<String> pkNames = extractPrimaryKeyNames(data);
+ if (pkNames == null || pkNames.isEmpty()) {
+ return new ArrayList<>();
+ }
+ return extractValues(data, pkNames.toArray(new String[]{}));
+ }
+
/**
* Deserialize from byte array
*
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
index ffd994f2d..6a6b67d38 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
@@ -19,6 +19,11 @@ package org.apache.inlong.sort.base.format;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
+import java.util.List;
+import static org.apache.inlong.sort.base.Constants.DATA;
+import static org.apache.inlong.sort.base.Constants.PK_NAMES;
+
/**
* Canal json dynamic format
*/
@@ -38,14 +43,26 @@ public class CanalJsonDynamicSchemaFormat extends
JsonDynamicSchemaFormat {
}
@Override
- protected JsonNode getPhysicalData(JsonNode root) {
- JsonNode physicalData = root.get("data");
+ public JsonNode getPhysicalData(JsonNode root) {
+ JsonNode physicalData = root.get(DATA);
if (physicalData != null) {
- return root.get("data").get(0);
+ return root.get(DATA).get(0);
}
return null;
}
+ @Override
+ public List<String> extractPrimaryKeyNames(JsonNode data) {
+ JsonNode pkNamesNode = data.get(PK_NAMES);
+ List<String> pkNames = new ArrayList<>();
+ if (pkNamesNode != null && pkNamesNode.isArray()) {
+ for (int i = 0; i < pkNamesNode.size(); i++) {
+ pkNames.add(pkNamesNode.get(i).asText());
+ }
+ }
+ return pkNames;
+ }
+
/**
* Get the identifier of this dynamic schema format
*
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
index c5298245e..11754226d 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
@@ -19,6 +19,13 @@ package org.apache.inlong.sort.base.format;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
+import java.util.List;
+import static org.apache.inlong.sort.base.Constants.AFTER;
+import static org.apache.inlong.sort.base.Constants.BEFORE;
+import static org.apache.inlong.sort.base.Constants.PK_NAMES;
+import static org.apache.inlong.sort.base.Constants.SOURCE;
+
/**
* Debezium json dynamic format
*/
@@ -38,14 +45,30 @@ public class DebeziumJsonDynamicSchemaFormat extends
JsonDynamicSchemaFormat {
}
@Override
- protected JsonNode getPhysicalData(JsonNode root) {
- JsonNode physicalData = root.get("after");
+ public JsonNode getPhysicalData(JsonNode root) {
+ JsonNode physicalData = root.get(AFTER);
if (physicalData == null) {
- physicalData = root.get("before");
+ physicalData = root.get(BEFORE);
}
return physicalData;
}
+ @Override
+ public List<String> extractPrimaryKeyNames(JsonNode data) {
+ List<String> pkNames = new ArrayList<>();
+ JsonNode sourceNode = data.get(SOURCE);
+ if (sourceNode == null) {
+ return pkNames;
+ }
+ JsonNode pkNamesNode = sourceNode.get(PK_NAMES);
+ if (pkNamesNode != null && pkNamesNode.isArray()) {
+ for (int i = 0; i < pkNamesNode.size(); i++) {
+ pkNames.add(pkNamesNode.get(i).asText());
+ }
+ }
+ return pkNames;
+ }
+
/**
* Get the identifier of this dynamic schema format
*
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index a238b1369..b8be01e80 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -17,12 +17,15 @@
package org.apache.inlong.sort.base.format;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
/**
@@ -43,19 +46,17 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
private final ObjectMapper objectMapper = new ObjectMapper();
/**
- * Extract value by key from the raw data
+ * Extract values by keys from the raw data
*
- * @param message The byte array of raw data
+ * @param root The raw data
* @param keys The key list that will be used to extract
* @return The value list maps the keys
- * @throws IOException The exceptions may throws when extract
*/
@Override
- public List<String> extract(byte[] message, String... keys) throws
IOException {
+ public List<String> extractValues(JsonNode root, String... keys) {
if (keys == null || keys.length == 0) {
return new ArrayList<>();
}
- final JsonNode root = deserialize(message);
JsonNode physicalNode = getPhysicalData(root);
List<String> values = new ArrayList<>(keys.length);
if (physicalNode == null) {
@@ -164,5 +165,21 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
* @param root The json root node
* @return The physical data node
*/
- protected abstract JsonNode getPhysicalData(JsonNode root);
+ public abstract JsonNode getPhysicalData(JsonNode root);
+
+ /**
+ * Convert physical data to map
+ *
+ * @param root The json root node
+ * @return The map of physicalData
+ * @throws IOException The exception may be thrown when executing
+ */
+ public Map<String, String> physicalDataToMap(JsonNode root) throws
IOException {
+ JsonNode physicalData = getPhysicalData(root);
+ if (physicalData == null) {
+ return new HashMap<>();
+ }
+ return objectMapper.convertValue(physicalData, new
TypeReference<Map<String, String>>() {
+ });
+ }
}
diff --git
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
index 2d12a4bee..ffa45e8c2 100644
---
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
+++
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
@@ -18,8 +18,14 @@
package org.apache.inlong.sort.base.format;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -80,6 +86,16 @@ public class CanalJsonDynamicSchemaFormatTest extends
DynamicSchemaFormatBaseTes
return expectedValues;
}
+ @Test
+ @SuppressWarnings({"unchecked"})
+ public void testExtractPrimaryKey() throws IOException {
+ JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+ .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+ List<String> primaryKeys =
getDynamicSchemaFormat().extractPrimaryKeyNames(rootNode);
+ List<String> values = getDynamicSchemaFormat().extractValues(rootNode,
primaryKeys.toArray(new String[]{}));
+ Assert.assertEquals(values, Collections.singletonList("111"));
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
diff --git
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
index 81c0e512e..bbd51dac6 100644
---
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
+++
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
@@ -18,8 +18,14 @@
package org.apache.inlong.sort.base.format;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -43,6 +49,7 @@ public class DebeziumJsonDynamicSchemaFormatTest extends
DynamicSchemaFormatBase
+ " \"weight\": 5.15\n"
+ " },\n"
+ " \"source\": {\"version\": \"0.9.5.Final\",\n"
+ + "\"pkNames\":[\"id\", \"name\"],"
+ "\t\"connector\": \"mysql\",\n"
+ "\t\"name\": \"fullfillment\",\n"
+ "\t\"server_id\" :1,\n"
@@ -71,6 +78,16 @@ public class DebeziumJsonDynamicSchemaFormatTest extends
DynamicSchemaFormatBase
return expectedValues;
}
+ @Test
+ @SuppressWarnings({"unchecked"})
+ public void testExtractPrimaryKey() throws IOException {
+ JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+ .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+ List<String> primaryKeys =
getDynamicSchemaFormat().extractPrimaryKeyNames(rootNode);
+ List<String> values = getDynamicSchemaFormat().extractValues(rootNode,
primaryKeys.toArray(new String[]{}));
+ Assert.assertEquals(values, Arrays.asList("111", "scooter"));
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/RawDataHashPartitioner.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/RawDataHashPartitioner.java
new file mode 100644
index 000000000..2f19e0489
--- /dev/null
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/RawDataHashPartitioner.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.partitioner;
+
+import org.apache.commons.lang3.StringUtils;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * The Raw Data Hash Partitioner is used to extract partition from raw
data(bytes array):
+ * It needs a partitionPattern used to parse the pattern and a format {@link
RawDataHashPartitioner#sinkMultipleFormat}
+ * to deserialization the raw data(bytes array)
+ * This partitioner will extract primary key from raw data as the partition
key used hash if the 'partitionPattern'
+ * equals 'PRIMARY_KEY' else it will parse partition key from raw data.
+ *
+ * @param <T>
+ */
+public class RawDataHashPartitioner<T> extends FlinkKafkaPartitioner<T> {
+
+ /**
+ * The primary key constant, this partitioner will extract primary key
from raw data if the 'partitionPattern'
+ * equals 'PRIMARY_KEY'
+ */
+ public static final String PRIMARY_KEY = "PRIMARY_KEY";
+ private static final Logger LOG =
LoggerFactory.getLogger(RawDataHashPartitioner.class);
+ private static final long serialVersionUID = 1L;
+ /**
+ * The partition pattern used to extract partition
+ */
+ private String partitionPattern;
+
+ /**
+ * The format used to deserialization the raw data(bytes array)
+ */
+ private String sinkMultipleFormat;
+
+ @SuppressWarnings({"rawtypes"})
+ private AbstractDynamicSchemaFormat dynamicSchemaFormat;
+
+ @Override
+ public void open(int parallelInstanceId, int parallelInstances) {
+ super.open(parallelInstanceId, parallelInstances);
+ dynamicSchemaFormat =
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+ }
+
+ @Override
+ @SuppressWarnings({"unchecked"})
+ public int partition(T record, byte[] key, byte[] value, String
targetTopic, int[] partitions) {
+ Preconditions.checkArgument(
+ partitions != null && partitions.length > 0,
+ "Partitions of the target topic is empty.");
+ int partition = 0;
+ try {
+ String partitionKey;
+ if (PRIMARY_KEY.equals(partitionPattern)) {
+ List<String> values =
dynamicSchemaFormat.extractPrimaryKeyValues(value);
+ if (values == null || values.isEmpty()) {
+ return partition;
+ }
+ partitionKey = StringUtils.join(values, "");
+ } else {
+ partitionKey = dynamicSchemaFormat.parse(value,
partitionPattern);
+ }
+ partition = partitions[(partitionKey.hashCode() &
Integer.MAX_VALUE) % partitions.length];
+ } catch (Exception e) {
+ LOG.warn("Extract partition failed", e);
+ }
+ return partition;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof RawDataHashPartitioner;
+ }
+
+ @Override
+ public int hashCode() {
+ return RawDataHashPartitioner.class.hashCode();
+ }
+
+ public String getPartitionPattern() {
+ return partitionPattern;
+ }
+
+ public void setPartitionPattern(String partitionPattern) {
+ this.partitionPattern = partitionPattern;
+ }
+
+ public String getSinkMultipleFormat() {
+ return sinkMultipleFormat;
+ }
+
+ public void setSinkMultipleFormat(String sinkMultipleFormat) {
+ this.sinkMultipleFormat = sinkMultipleFormat;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 49ca0a56a..480d55948 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
@@ -53,6 +54,7 @@ import org.apache.flink.types.RowKind;
import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.kafka.KafkaDynamicSink;
+import org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner;
import javax.annotation.Nullable;
import java.time.Duration;
@@ -89,11 +91,11 @@ import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VAL
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.autoCompleteSchemaRegistrySubject;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -111,6 +113,14 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
public static final String IDENTIFIER = "kafka-inlong";
+ public static final String SINK_PARTITIONER_VALUE_RAW_HASH = "raw-hash";
+
+ public static final ConfigOption<String> SINK_MULTIPLE_PARTITION_PATTERN =
+ ConfigOptions.key("sink.multiple.partition-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("option 'sink.multiple.partition-pattern'
used when the partitioner is raw-hash.");
+
private static final Set<String> SINK_SEMANTIC_ENUMS =
new HashSet<>(
Arrays.asList(
@@ -206,6 +216,19 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
throw new ValidationException(
"Currently 'round-robin' partitioner only
works "
+ "when option 'key.fields' is not
specified.");
+ } else if
(SINK_PARTITIONER_VALUE_RAW_HASH.equals(partitioner)
+ ||
"org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner".equals(partitioner))
{
+ boolean invalid =
!"raw".equals(tableOptions.getOptional(FORMAT).orElse(null))
+ ||
!tableOptions.getOptional(SINK_MULTIPLE_FORMAT).isPresent()
+ ||
!tableOptions.getOptional(SINK_MULTIPLE_PARTITION_PATTERN).isPresent()
+ ||
tableOptions.getOptional(SINK_MULTIPLE_FORMAT).get().isEmpty()
+ ||
tableOptions.getOptional(SINK_MULTIPLE_PARTITION_PATTERN).get().isEmpty();
+ if (invalid) {
+ throw new ValidationException(
+ "Currently 'raw-hash' partitioner only
works "
+ + "when option 'format' is 'raw'
and option 'sink.multiple.format' "
+ + "and
'sink.multiple.partition-pattern' is specified.");
+ }
} else if (partitioner.isEmpty()) {
throw new ValidationException(
String.format(
@@ -215,6 +238,30 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
});
}
+ private Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
+ ReadableConfig tableOptions, ClassLoader classLoader) {
+ if (tableOptions.getOptional(SINK_PARTITIONER).isPresent()
+ &&
SINK_PARTITIONER_VALUE_RAW_HASH.equals(tableOptions.getOptional(SINK_PARTITIONER).get()))
{
+ RawDataHashPartitioner<RowData> rawHashPartitioner = new
RawDataHashPartitioner<>();
+
rawHashPartitioner.setSinkMultipleFormat(tableOptions.getOptional(SINK_MULTIPLE_FORMAT).orElse(null));
+
rawHashPartitioner.setPartitionPattern(tableOptions.getOptional(SINK_MULTIPLE_PARTITION_PATTERN)
+ .orElse(null));
+ return Optional.of(rawHashPartitioner);
+ }
+ Optional<FlinkKafkaPartitioner<RowData>> partitioner = KafkaOptions
+ .getFlinkKafkaPartitioner(tableOptions, classLoader);
+ if (partitioner.isPresent()) {
+ if (partitioner.get() instanceof RawDataHashPartitioner) {
+ RawDataHashPartitioner<RowData> rawHashPartitioner =
+ (RawDataHashPartitioner<RowData>) partitioner.get();
+
rawHashPartitioner.setSinkMultipleFormat(tableOptions.getOptional(SINK_MULTIPLE_FORMAT).orElse(null));
+
rawHashPartitioner.setPartitionPattern(tableOptions.getOptional(SINK_MULTIPLE_PARTITION_PATTERN)
+ .orElse(null));
+ }
+ }
+ return partitioner;
+ }
+
private void validateSinkSemantic(ReadableConfig tableOptions) {
tableOptions
.getOptional(SINK_SEMANTIC)
@@ -263,6 +310,7 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
options.add(SINK_MULTIPLE_FORMAT);
+ options.add(SINK_MULTIPLE_PARTITION_PATTERN);
return options;
}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
index 783f1f4ec..1c3500563 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
@@ -50,7 +50,8 @@ import java.util.Map;
import java.util.stream.Collectors;
/**
- * Test for kafka load sql parse
+ * Test for {@link KafkaLoadNode} sql parse
+ * and raw hash partitioner {@link
org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner}
*/
public class KafkaLoadSqlParseTest extends AbstractTestBase {
@@ -93,7 +94,19 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase {
new FieldInfo("raw", new VarBinaryFormatInfo())));
return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
null,
"topic_output", "localhost:9092", new RawFormat(), null,
- null, null, new CanalJsonFormat(), "${database}_${table}");
+ null, null, new CanalJsonFormat(), "${database}_${table}",
+ null, null);
+ }
+
+ private KafkaLoadNode buildKafkaLoadNodeWithDynamicPartition(String
pattern) {
+ List<FieldInfo> fields = Collections.singletonList(new
FieldInfo("raw", new VarBinaryFormatInfo()));
+ List<FieldRelation> relations = Collections
+ .singletonList(new FieldRelation(new FieldInfo("raw", new
VarBinaryFormatInfo()),
+ new FieldInfo("raw", new VarBinaryFormatInfo())));
+ return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
null,
+ "topic_output", "localhost:9092", new RawFormat(), null,
+ null, null, new CanalJsonFormat(), null,
+ "raw-hash", pattern);
}
/**
@@ -137,7 +150,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase
{
Assert.assertTrue(result.tryExecute());
}
-
/**
* Test kafka to kafka with dynamic topic
*
@@ -165,4 +177,60 @@ public class KafkaLoadSqlParseTest extends
AbstractTestBase {
ParseResult result = parser.parse();
Assert.assertTrue(result.tryExecute());
}
+
+ /**
+ * Test kafka to kafka with dynamic partition
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testKafkaDynamicPartitionParse() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildKafkaExtractNode();
+ Node outputNode =
buildKafkaLoadNodeWithDynamicPartition("${database}_${table}");
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+ /**
+ * Test kafka to kafka with dynamic partition based on hash of primary key
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testKafkaDynamicPartitionWithPrimaryKey() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildKafkaExtractNode();
+ Node outputNode =
buildKafkaLoadNodeWithDynamicPartition("PRIMARY_KEY");
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
}