This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a11c7b7 [SeaTunnel#1178] Support specifying timestamp column and
format in Druid Flink sink plugin (#1179)
a11c7b7 is described below
commit a11c7b77f85b7d37ed3fa38e8b6aed324de55808
Author: Benedict Jin <[email protected]>
AuthorDate: Fri Jan 28 17:00:51 2022 +0800
[SeaTunnel#1178] Support specifying timestamp column and format in Druid
Flink sink plugin (#1179)
---
docs/en/flink/configuration/sink-plugins/Druid.md | 51 ++++++++++++++++++++--
.../seatunnel/flink/sink/DruidOutputFormat.java | 13 ++++--
.../org/apache/seatunnel/flink/sink/DruidSink.java | 8 +++-
3 files changed, 64 insertions(+), 8 deletions(-)
diff --git a/docs/en/flink/configuration/sink-plugins/Druid.md
b/docs/en/flink/configuration/sink-plugins/Druid.md
index d2aa4dc..7d95058 100644
--- a/docs/en/flink/configuration/sink-plugins/Druid.md
+++ b/docs/en/flink/configuration/sink-plugins/Druid.md
@@ -6,10 +6,12 @@ Write data to Apache Druid.
## Options
-| name | type | required | default value |
-| --------------- | -------- | -------- | ------------- |
-| coordinator_url | `String` | yes | - |
-| datasource | `String` | yes | - |
+| name | type | required | default value |
+| ---------------- | -------- | -------- | ------------- |
+| coordinator_url | `String` | yes | - |
+| datasource | `String` | yes | - |
+| timestamp_column | `String` | no | timestamp |
+| timestamp_format | `String` | no | auto |
### coordinator_url [`String`]
@@ -19,11 +21,52 @@ The URL of Coordinator service in Apache Druid.
The DataSource name in Apache Druid.
+### timestamp_column [`String`]
+
+The timestamp column name in Apache Druid, the default value is `timestamp`.
+
+### timestamp_format [`String`]
+
+The timestamp format in Apache Druid, the default value is `auto`, it could be:
+
+- `iso`
+ - ISO8601 with 'T' separator, like "2000-01-01T01:02:03.456"
+
+- `posix`
+ - seconds since epoch
+
+- `millis`
+ - milliseconds since epoch
+
+- `micro`
+ - microseconds since epoch
+
+- `nano`
+ - nanoseconds since epoch
+
+- `auto`
+ - automatically detects ISO (either 'T' or space separator) or millis format
+
+- any [Joda
DateTimeFormat](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html)
string
+
## Example
+### Simple
+
+```hocon
+DruidSink {
+ coordinator_url = "http://localhost:8081/"
+ datasource = "wikipedia"
+}
+```
+
+### Specified timestamp column and format
+
```hocon
DruidSink {
coordinator_url = "http://localhost:8081/"
datasource = "wikipedia"
+ timestamp_column = "timestamp"
+ timestamp_format = "auto"
}
```
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidOutputFormat.java
b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidOutputFormat.java
index 9cb8fc4..1a638b7 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidOutputFormat.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidOutputFormat.java
@@ -58,14 +58,21 @@ public class DruidOutputFormat extends
RichOutputFormat<Row> {
private static final Logger LOGGER =
LoggerFactory.getLogger(DruidOutputFormat.class);
+ private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
+ private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
+
private final transient StringBuffer data;
private final String coordinatorURL;
private final String datasource;
+ private final String timestampColumn;
+ private final String timestampFormat;
- public DruidOutputFormat(String coordinatorURL, String datasource) {
+ public DruidOutputFormat(String coordinatorURL, String datasource, String
timestampColumn, String timestampFormat) {
this.data = new StringBuffer();
this.coordinatorURL = coordinatorURL;
this.datasource = datasource;
+ this.timestampColumn = timestampColumn == null ?
DEFAULT_TIMESTAMP_COLUMN : timestampColumn;
+ this.timestampFormat = timestampFormat == null ?
DEFAULT_TIMESTAMP_FORMAT : timestampFormat;
}
@Override
@@ -142,7 +149,7 @@ public class DruidOutputFormat extends
RichOutputFormat<Row> {
new ParallelIndexIngestionSpec(
new DataSchema(
this.datasource,
- new TimestampSpec("ts", "auto", null),
+ new TimestampSpec(this.timestampColumn,
this.timestampFormat, null),
new DimensionsSpec(Collections.emptyList()),
null,
new UniformGranularitySpec(Granularities.HOUR,
Granularities.MINUTE, false, null),
@@ -160,7 +167,7 @@ public class DruidOutputFormat extends
RichOutputFormat<Row> {
null,
new InlineInputSource(this.data.toString()),
new CsvInputFormat(
- Arrays.asList("name", "ts"),
+ Arrays.asList("name", timestampColumn),
"|",
null,
false,
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
index 7c55063..4c25b2c 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
@@ -32,14 +32,18 @@ public class DruidSink implements FlinkBatchSink<Row, Row> {
private static final String COORDINATOR_URL = "coordinator_url";
private static final String DATASOURCE = "datasource";
+ private static final String TIMESTAMP_COLUMN = "timestamp_column";
+ private static final String TIMESTAMP_FORMAT = "timestamp_format";
private Config config;
private String coordinatorURL;
private String datasource;
+ private String timestampColumn;
+ private String timestampFormat;
@Override
public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row>
dataSet) {
- return dataSet.output(new DruidOutputFormat(coordinatorURL,
datasource));
+ return dataSet.output(new DruidOutputFormat(coordinatorURL,
datasource, timestampColumn, timestampFormat));
}
@Override
@@ -61,5 +65,7 @@ public class DruidSink implements FlinkBatchSink<Row, Row> {
public void prepare(FlinkEnvironment env) {
this.coordinatorURL = config.getString(COORDINATOR_URL);
this.datasource = config.getString(DATASOURCE);
+ this.timestampColumn = config.hasPath(TIMESTAMP_COLUMN) ?
config.getString(TIMESTAMP_COLUMN) : null;
+ this.timestampFormat = config.hasPath(TIMESTAMP_FORMAT) ?
config.getString(TIMESTAMP_FORMAT) : null;
}
}