This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 803941656 [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)
803941656 is described below
commit 8039416562f931fa22c342728ad88e0ff7e0ad36
Author: feat <[email protected]>
AuthorDate: Thu Jan 5 10:35:58 2023 +0800
[INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)
Co-authored-by: healchow <[email protected]>
---
.../protocol/node/extract/KafkaExtractNode.java | 53 +++++++++++++++++++---
.../node/extract/KafkaExtractNodeTest.java | 22 +++++++++
2 files changed, 68 insertions(+), 7 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 718c3c21c..6a0501759 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -18,6 +18,8 @@
package org.apache.inlong.sort.protocol.node.extract;
import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map.Entry;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.commons.lang3.StringUtils;
@@ -38,6 +40,7 @@ import
org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.format.RawFormat;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -133,7 +136,17 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
}
/**
- * generate table options
+ * Generate table options for Kafka extract node.
+ * <p/>
+ * Upsert Kafka stores message keys and values as bytes, so no need
specified the schema or data types for Kafka.
+ * <br/>
+ * The messages of Kafka are serialized and deserialized by formats, e.g.
csv, json, avro.
+ * <br/>
+ * Thus, the data type mapping is determined by specific formats.
+ * <p/>
+ * For more details:
+ * <a
href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/">
+ * upsert-kafka</a>
*
* @return options
*/
@@ -142,7 +155,12 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
Map<String, String> options = super.tableOptions();
options.put(KafkaConstant.TOPIC, topic);
options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS,
bootstrapServers);
- if (format instanceof JsonFormat || format instanceof AvroFormat ||
format instanceof CsvFormat) {
+
+ boolean wrapWithInlongMsg = format instanceof InLongMsgFormat;
+ Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat)
format).getInnerFormat() : format;
+ if (realFormat instanceof JsonFormat
+ || realFormat instanceof AvroFormat
+ || realFormat instanceof CsvFormat) {
if (StringUtils.isEmpty(this.primaryKey)) {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
options.put(KafkaConstant.SCAN_STARTUP_MODE,
kafkaScanStartupMode.getValue());
@@ -152,13 +170,14 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
if (StringUtils.isNotBlank(scanTimestampMillis)) {
options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS,
scanTimestampMillis);
}
- options.putAll(format.generateOptions(false));
+
options.putAll(delegateInlongFormat(realFormat.generateOptions(false),
wrapWithInlongMsg));
} else {
options.put(KafkaConstant.CONNECTOR,
KafkaConstant.UPSERT_KAFKA);
- options.putAll(format.generateOptions(true));
+
options.putAll(delegateInlongFormat(realFormat.generateOptions(true),
wrapWithInlongMsg));
}
- } else if (format instanceof CanalJsonFormat || format instanceof
DebeziumJsonFormat
- || format instanceof RawFormat) {
+ } else if (realFormat instanceof CanalJsonFormat
+ || realFormat instanceof DebeziumJsonFormat
+ || realFormat instanceof RawFormat) {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
options.put(KafkaConstant.SCAN_STARTUP_MODE,
kafkaScanStartupMode.getValue());
if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
@@ -167,7 +186,7 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
if (StringUtils.isNotBlank(scanTimestampMillis)) {
options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS,
scanTimestampMillis);
}
- options.putAll(format.generateOptions(false));
+
options.putAll(delegateInlongFormat(realFormat.generateOptions(false),
wrapWithInlongMsg));
} else {
throw new IllegalArgumentException("kafka extract node format is
IllegalArgument");
}
@@ -177,6 +196,26 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
return options;
}
+ private Map<String, String> delegateInlongFormat(
+ Map<String, String> realOptions,
+ boolean wrapWithInlongMsg) {
+ if (!wrapWithInlongMsg) {
+ return realOptions;
+ }
+ Map<String, String> options = new HashMap<>();
+ for (Entry<String, String> entry : realOptions.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if ("format".equals(key)) {
+ options.put("format", "inlong-msg");
+ options.put("inlong-msg.inner.format", value);
+ } else {
+ options.put("inlong-msg." + key, value);
+ }
+ }
+ return options;
+ }
+
@Override
public String genTableName() {
return String.format("table_%s", super.getId());
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index 906db8231..cda33fae9 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -26,6 +26,7 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.node.format.RawFormat;
import org.junit.Assert;
import org.junit.Test;
@@ -112,4 +113,25 @@ public class KafkaExtractNodeTest extends
SerializeBaseTest<KafkaExtractNode> {
}
Assert.assertTrue(formatEquals);
}
+
+ @Test
+ public void testInLongFormat() {
+ List<FieldInfo> fields = Arrays.asList(
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()));
+
+ KafkaExtractNode kafkaNode = getTestObject();
+ InLongMsgFormat inLongMsgFormat = new InLongMsgFormat(new CsvFormat(),
false);
+ kafkaNode.setFormat(inLongMsgFormat);
+
+ Map<String, String> options = kafkaNode.tableOptions();
+ assertEquals("inlong-msg", options.get("format"));
+ assertEquals("csv", options.get("inlong-msg.inner.format"));
+ assertEquals("true",
options.get("inlong-msg.csv.ignore-parse-errors"));
+
+ kafkaNode.setFormat(new CsvFormat());
+ Map<String, String> csvOptions = kafkaNode.tableOptions();
+ assertEquals("csv", csvOptions.get("format"));
+ assertEquals("true", csvOptions.get("csv.ignore-parse-errors"));
+ }
}