This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a11c7b7  [SeaTunnel#1178] Support specifying timestamp column and 
format in Druid Flink sink plugin (#1179)
a11c7b7 is described below

commit a11c7b77f85b7d37ed3fa38e8b6aed324de55808
Author: Benedict Jin <[email protected]>
AuthorDate: Fri Jan 28 17:00:51 2022 +0800

    [SeaTunnel#1178] Support specifying timestamp column and format in Druid 
Flink sink plugin (#1179)
---
 docs/en/flink/configuration/sink-plugins/Druid.md  | 51 ++++++++++++++++++++--
 .../seatunnel/flink/sink/DruidOutputFormat.java    | 13 ++++--
 .../org/apache/seatunnel/flink/sink/DruidSink.java |  8 +++-
 3 files changed, 64 insertions(+), 8 deletions(-)

diff --git a/docs/en/flink/configuration/sink-plugins/Druid.md 
b/docs/en/flink/configuration/sink-plugins/Druid.md
index d2aa4dc..7d95058 100644
--- a/docs/en/flink/configuration/sink-plugins/Druid.md
+++ b/docs/en/flink/configuration/sink-plugins/Druid.md
@@ -6,10 +6,12 @@ Write data to Apache Druid.
 
 ## Options
 
-| name            | type     | required | default value |
-| --------------- | -------- | -------- | ------------- |
-| coordinator_url | `String` | yes      | -             |
-| datasource      | `String` | yes      | -             |
+| name             | type     | required | default value |
+| ---------------- | -------- | -------- | ------------- |
+| coordinator_url  | `String` | yes      | -             |
+| datasource       | `String` | yes      | -             |
+| timestamp_column | `String` | no       | timestamp     |
+| timestamp_format | `String` | no       | auto          |
 
 ### coordinator_url [`String`]
 
@@ -19,11 +21,52 @@ The URL of Coordinator service in Apache Druid.
 
 The DataSource name in Apache Druid.
 
+### timestamp_column [`String`]
+
+The timestamp column name in Apache Druid, the default value is `timestamp`.
+
+### timestamp_format [`String`]
+
+The timestamp format in Apache Druid, the default value is `auto`, it could be:
+
+- `iso`
+  - ISO8601 with 'T' separator, like "2000-01-01T01:02:03.456"
+
+- `posix`
+  - seconds since epoch
+
+- `millis`
+  - milliseconds since epoch
+
+- `micro`
+  - microseconds since epoch
+
+- `nano`
+  - nanoseconds since epoch
+
+- `auto`
+  - automatically detects ISO (either 'T' or space separator) or millis format
+
+- any [Joda 
DateTimeFormat](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html)
 string
+
 ## Example
 
+### Simple
+
+```hocon
+DruidSink {
+  coordinator_url = "http://localhost:8081/";
+  datasource = "wikipedia"
+}
+```
+
+### Specified timestamp column and format
+
 ```hocon
 DruidSink {
   coordinator_url = "http://localhost:8081/";
   datasource = "wikipedia"
+  timestamp_column = "timestamp"
+  timestamp_format = "auto"
 }
 ```
diff --git 
a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidOutputFormat.java
 
b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidOutputFormat.java
index 9cb8fc4..1a638b7 100644
--- 
a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidOutputFormat.java
+++ 
b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidOutputFormat.java
@@ -58,14 +58,21 @@ public class DruidOutputFormat extends 
RichOutputFormat<Row> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DruidOutputFormat.class);
 
+    private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
+    private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
+
     private final transient StringBuffer data;
     private final String coordinatorURL;
     private final String datasource;
+    private final String timestampColumn;
+    private final String timestampFormat;
 
-    public DruidOutputFormat(String coordinatorURL, String datasource) {
+    public DruidOutputFormat(String coordinatorURL, String datasource, String 
timestampColumn, String timestampFormat) {
         this.data = new StringBuffer();
         this.coordinatorURL = coordinatorURL;
         this.datasource = datasource;
+        this.timestampColumn = timestampColumn == null ? 
DEFAULT_TIMESTAMP_COLUMN : timestampColumn;
+        this.timestampFormat = timestampFormat == null ? 
DEFAULT_TIMESTAMP_FORMAT : timestampFormat;
     }
 
     @Override
@@ -142,7 +149,7 @@ public class DruidOutputFormat extends 
RichOutputFormat<Row> {
                 new ParallelIndexIngestionSpec(
                         new DataSchema(
                                 this.datasource,
-                                new TimestampSpec("ts", "auto", null),
+                                new TimestampSpec(this.timestampColumn, 
this.timestampFormat, null),
                                 new DimensionsSpec(Collections.emptyList()),
                                 null,
                                 new UniformGranularitySpec(Granularities.HOUR, 
Granularities.MINUTE, false, null),
@@ -160,7 +167,7 @@ public class DruidOutputFormat extends 
RichOutputFormat<Row> {
                 null,
                 new InlineInputSource(this.data.toString()),
                 new CsvInputFormat(
-                        Arrays.asList("name", "ts"),
+                        Arrays.asList("name", timestampColumn),
                         "|",
                         null,
                         false,
diff --git 
a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
 
b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
index 7c55063..4c25b2c 100644
--- 
a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
+++ 
b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
@@ -32,14 +32,18 @@ public class DruidSink implements FlinkBatchSink<Row, Row> {
 
     private static final String COORDINATOR_URL = "coordinator_url";
     private static final String DATASOURCE = "datasource";
+    private static final String TIMESTAMP_COLUMN = "timestamp_column";
+    private static final String TIMESTAMP_FORMAT = "timestamp_format";
 
     private Config config;
     private String coordinatorURL;
     private String datasource;
+    private String timestampColumn;
+    private String timestampFormat;
 
     @Override
     public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> 
dataSet) {
-        return dataSet.output(new DruidOutputFormat(coordinatorURL, 
datasource));
+        return dataSet.output(new DruidOutputFormat(coordinatorURL, 
datasource, timestampColumn, timestampFormat));
     }
 
     @Override
@@ -61,5 +65,7 @@ public class DruidSink implements FlinkBatchSink<Row, Row> {
     public void prepare(FlinkEnvironment env) {
         this.coordinatorURL = config.getString(COORDINATOR_URL);
         this.datasource = config.getString(DATASOURCE);
+        this.timestampColumn = config.hasPath(TIMESTAMP_COLUMN) ? 
config.getString(TIMESTAMP_COLUMN) : null;
+        this.timestampFormat = config.hasPath(TIMESTAMP_FORMAT) ? 
config.getString(TIMESTAMP_FORMAT) : null;
     }
 }

Reply via email to