This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new be9b52750 [FLINK-36572][pipeline-connector][starrocks] Fix the issue 
that the local time zone is wrongly set
be9b52750 is described below

commit be9b52750c7bb4835a7b8631ebdb17666aff6bb1
Author: Runkang He <hrun...@gmail.com>
AuthorDate: Mon Nov 11 13:55:02 2024 +0800

    [FLINK-36572][pipeline-connector][starrocks] Fix the issue that the local 
time zone is wrongly set
    
    This closes  #3655.
---
 .../starrocks/sink/StarRocksDataSink.java          |  6 ++++
 .../starrocks/sink/StarRocksDataSinkFactory.java   |  2 +-
 .../sink/StarRocksDataSinkFactoryTest.java         | 33 ++++++++++++++++++++++
 3 files changed, 40 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java
index 4c1969ae9..9811a010d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.cdc.connectors.starrocks.sink;
 
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.sink.DataSink;
 import org.apache.flink.cdc.common.sink.EventSinkProvider;
@@ -78,4 +79,9 @@ public class StarRocksDataSink implements DataSink, 
Serializable {
                         sinkOptions.getPassword());
         return new StarRocksMetadataApplier(catalog, tableCreateConfig, 
schemaChangeConfig);
     }
+
+    @VisibleForTesting
+    public ZoneId getZoneId() {
+        return zoneId;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
index d1995fee6..f9fe58034 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
@@ -51,7 +51,7 @@ public class StarRocksDataSinkFactory implements 
DataSinkFactory {
                 TableCreateConfig.from(context.getFactoryConfiguration());
         SchemaChangeConfig schemaChangeConfig =
                 SchemaChangeConfig.from(context.getFactoryConfiguration());
-        String zoneStr = 
context.getFactoryConfiguration().get(PIPELINE_LOCAL_TIME_ZONE);
+        String zoneStr = 
context.getPipelineConfiguration().get(PIPELINE_LOCAL_TIME_ZONE);
         ZoneId zoneId =
                 PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zoneStr)
                         ? ZoneId.systemDefault()
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java
index 7f498beb8..9b178874e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.time.ZoneId;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -150,4 +151,36 @@ public class StarRocksDataSinkFactoryTest {
                                 conf, conf, 
Thread.currentThread().getContextClassLoader()));
         Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class);
     }
+
+    @Test
+    void testCreateDataSinkWithSpecificedTimeZone() {
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", 
DataSinkFactory.class);
+        
Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class);
+
+        Configuration factoryConfiguration =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
+                                .put("load-url", "127.0.0.1:8030")
+                                .put("username", "root")
+                                .put("password", "")
+                                .build());
+        Configuration pipelineConfiguration =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put("local-time-zone", "America/Los_Angeles")
+                                .build());
+        DataSink dataSink =
+                sinkFactory.createDataSink(
+                        new FactoryHelper.DefaultContext(
+                                factoryConfiguration,
+                                pipelineConfiguration,
+                                
Thread.currentThread().getContextClassLoader()));
+        Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class);
+
+        ZoneId zoneId = ((StarRocksDataSink) dataSink).getZoneId();
+        ZoneId expectedZondId = ZoneId.of("America/Los_Angeles");
+        Assertions.assertThat(zoneId).isEqualTo(expectedZondId);
+    }
 }

Reply via email to