This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/release-3.2 by this push:
new d63512ea1 [FLINK-36572][pipeline-connector][starrocks] Fix the issue
that the local time zone is wrongly set
d63512ea1 is described below
commit d63512ea195a38f64fecc3cc282f77f613684539
Author: Runkang He <[email protected]>
AuthorDate: Thu Nov 21 10:04:09 2024 +0800
[FLINK-36572][pipeline-connector][starrocks] Fix the issue that the local
time zone is wrongly set
This closes #3733.
---
.../starrocks/sink/StarRocksDataSink.java | 6 ++++
.../starrocks/sink/StarRocksDataSinkFactory.java | 2 +-
.../sink/StarRocksDataSinkFactoryTest.java | 33 ++++++++++++++++++++++
3 files changed, 40 insertions(+), 1 deletion(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java
index 4c1969ae9..9811a010d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java
@@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.starrocks.sink;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider;
@@ -78,4 +79,9 @@ public class StarRocksDataSink implements DataSink,
Serializable {
sinkOptions.getPassword());
return new StarRocksMetadataApplier(catalog, tableCreateConfig,
schemaChangeConfig);
}
+
+ @VisibleForTesting
+ public ZoneId getZoneId() {
+ return zoneId;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
index d1995fee6..f9fe58034 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
@@ -51,7 +51,7 @@ public class StarRocksDataSinkFactory implements
DataSinkFactory {
TableCreateConfig.from(context.getFactoryConfiguration());
SchemaChangeConfig schemaChangeConfig =
SchemaChangeConfig.from(context.getFactoryConfiguration());
- String zoneStr =
context.getFactoryConfiguration().get(PIPELINE_LOCAL_TIME_ZONE);
+ String zoneStr =
context.getPipelineConfiguration().get(PIPELINE_LOCAL_TIME_ZONE);
ZoneId zoneId =
PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zoneStr)
? ZoneId.systemDefault()
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java
index 7f498beb8..9b178874e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java
@@ -30,6 +30,7 @@ import
org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -150,4 +151,36 @@ public class StarRocksDataSinkFactoryTest {
conf, conf,
Thread.currentThread().getContextClassLoader()));
Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class);
}
+
+ @Test
+ void testCreateDataSinkWithSpecificedTimeZone() {
+ DataSinkFactory sinkFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks",
DataSinkFactory.class);
+
Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class);
+
+ Configuration factoryConfiguration =
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
+ .put("load-url", "127.0.0.1:8030")
+ .put("username", "root")
+ .put("password", "")
+ .build());
+ Configuration pipelineConfiguration =
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("local-time-zone", "America/Los_Angeles")
+ .build());
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ factoryConfiguration,
+ pipelineConfiguration,
+
Thread.currentThread().getContextClassLoader()));
+ Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class);
+
+ ZoneId zoneId = ((StarRocksDataSink) dataSink).getZoneId();
+ ZoneId expectedZondId = ZoneId.of("America/Los_Angeles");
+ Assertions.assertThat(zoneId).isEqualTo(expectedZondId);
+ }
}