This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2a2265a84c [INLONG-9695][Sort] Fix kafka extract node option config
building error when use upsert-kafka connector (#9697)
2a2265a84c is described below
commit 2a2265a84cb803d30e2e721d71733f11588dde99
Author: Xin Gong <[email protected]>
AuthorDate: Tue Feb 20 10:22:22 2024 +0800
[INLONG-9695][Sort] Fix kafka extract node option config building error
when use upsert-kafka connector (#9697)
---
.../protocol/node/extract/KafkaExtractNode.java | 16 +++++-----
.../inlong/sort/parser/KafkaSqlParseTest.java | 37 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 8 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 200e8ee70c..f595f0f85d 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -118,7 +118,7 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty");
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers,
"kafka bootstrapServers is empty");
this.format = Preconditions.checkNotNull(format, "kafka format is
empty");
- this.kafkaScanStartupMode =
Preconditions.checkNotNull(kafkaScanStartupMode, "kafka scanStartupMode is
empty");
+ this.kafkaScanStartupMode = kafkaScanStartupMode;
this.primaryKey = primaryKey;
this.groupId = groupId;
if (kafkaScanStartupMode == KafkaScanStartupMode.SPECIFIC_OFFSETS) {
@@ -154,19 +154,19 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
Map<String, String> options = super.tableOptions();
options.put(KafkaConstant.TOPIC, topic);
options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS,
bootstrapServers);
- options.put(KafkaConstant.SCAN_STARTUP_MODE,
kafkaScanStartupMode.getValue());
if (isUpsertKafkaConnector(format,
!StringUtils.isEmpty(this.primaryKey))) {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
options.putAll(format.generateOptions(true));
} else {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
options.putAll(format.generateOptions(false));
- }
- if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
- options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS,
scanSpecificOffsets);
- }
- if (StringUtils.isNotBlank(scanTimestampMillis)) {
- options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS,
scanTimestampMillis);
+ options.put(KafkaConstant.SCAN_STARTUP_MODE,
kafkaScanStartupMode.getValue());
+ if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
+ options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS,
scanSpecificOffsets);
+ }
+ if (StringUtils.isNotBlank(scanTimestampMillis)) {
+ options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS,
scanTimestampMillis);
+ }
}
if (StringUtils.isNotEmpty(groupId)) {
options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
index 79991404af..6c91da6bba 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
@@ -126,6 +126,16 @@ public class KafkaSqlParseTest extends AbstractTestBase {
null, "1665198979108");
}
+ private KafkaExtractNode buildUpsertKafkaExtract() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()));
+ return new KafkaExtractNode("1", "upsert-kafka_input", fields, null,
+ null, "topic_input", "localhost:9092",
+ new JsonFormat(), null, "id", "groupId",
+ null, null);
+ }
+
private Node buildMysqlLoadNodeForRawFormat() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("log", new
StringFormatInfo()));
List<FieldRelation> relations = Arrays
@@ -162,4 +172,31 @@ public class KafkaSqlParseTest extends AbstractTestBase {
ParseResult result = parser.parse();
Assert.assertTrue(result.tryExecute());
}
+
+ /**
+ * Test flink sql task for extract is upsert-kafka {@link
KafkaExtractNode} and load is mysql {@link MySqlLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testUpsertKafkaExtractNodeSqlParse() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildUpsertKafkaExtract();
+ Node outputNode = buildMysqlLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
}