This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a513c495e3 [Improve][Connector-V2] Support maxcompute sink writer with
timestamp field type (#9234)
a513c495e3 is described below
commit a513c495e38fe83866c9ce3829662222a928b324
Author: dy102 <[email protected]>
AuthorDate: Mon May 26 11:42:11 2025 +0900
[Improve][Connector-V2] Support maxcompute sink writer with timestamp field
type (#9234)
---
docs/en/connector-v2/sink/Maxcompute.md | 20 +++++++
.../api/options/ConnectorCommonOptions.java | 2 +
.../seatunnel/api/options/table/FormatOptions.java | 70 ++++++++++++++++++++++
.../seatunnel/file/config/BaseFileSinkConfig.java | 12 ++--
.../seatunnel/file/config/FileBaseOptions.java | 21 -------
.../seatunnel/file/config/FileBaseSinkOptions.java | 20 -------
.../file/source/reader/CsvReadStrategy.java | 21 ++++---
.../file/source/reader/ExcelReadStrategy.java | 14 +++--
.../file/source/reader/TextReadStrategy.java | 21 ++++---
.../file/source/reader/XmlReadStrategy.java | 7 ++-
.../file/cos/sink/CosFileSinkFactory.java | 6 +-
.../file/cos/source/CosFileSourceFactory.java | 6 +-
.../file/ftp/sink/FtpFileSinkFactory.java | 6 +-
.../file/ftp/source/FtpFileSourceFactory.java | 6 +-
.../file/hdfs/sink/HdfsFileSinkFactory.java | 6 +-
.../file/hdfs/source/HdfsFileSourceFactory.java | 6 +-
.../file/oss/jindo/sink/OssFileSinkFactory.java | 6 +-
.../oss/jindo/source/OssFileSourceFactory.java | 6 +-
.../file/local/sink/LocalFileSinkFactory.java | 6 +-
.../file/local/source/LocalFileSourceFactory.java | 6 +-
.../file/obs/sink/ObsFileSinkFactory.java | 6 +-
.../file/obs/source/ObsFileSourceFactory.java | 6 +-
.../file/oss/sink/OssFileSinkFactory.java | 6 +-
.../file/oss/source/OssFileSourceFactory.java | 6 +-
.../seatunnel/file/s3/sink/S3FileSinkFactory.java | 6 +-
.../file/s3/source/S3FileSourceFactory.java | 6 +-
.../file/sftp/sink/SftpFileSinkFactory.java | 6 +-
.../file/sftp/source/SftpFileSourceFactory.java | 6 +-
.../maxcompute/sink/MaxcomputeSinkFactory.java | 2 +
.../maxcompute/sink/MaxcomputeWriter.java | 9 ++-
.../maxcompute/util/FormatterContext.java | 48 +++++++++++++++
.../maxcompute/util/MaxcomputeTypeMapper.java | 24 ++++++--
.../maxcompute/BasicTypeToOdpsTypeTest.java | 61 ++++++++++++++++++-
33 files changed, 324 insertions(+), 136 deletions(-)
diff --git a/docs/en/connector-v2/sink/Maxcompute.md
b/docs/en/connector-v2/sink/Maxcompute.md
index fec35976f0..ed268102f4 100644
--- a/docs/en/connector-v2/sink/Maxcompute.md
+++ b/docs/en/connector-v2/sink/Maxcompute.md
@@ -112,6 +112,26 @@ Option introduction:
When data_save_mode selects CUSTOM_PROCESSING, you should fill in the
CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be
executed. SQL will be executed before synchronization tasks.
+### datetime_format[String]
+
+User-defined format string used to convert LocalDateTime fields to strings.
+
+Use this option when you want to specify a custom datetime format that matches
one of the predefined values in DateTimeUtils.Formatter (e.g. yyyy-MM-dd
HH:mm:ss, yyyyMMddHHmmss, etc.).
+
+Example values:
+
+- `yyyy-MM-dd HH:mm:ss`
+- `yyyy-MM-dd HH:mm:ss.SSSSSS`
+- `yyyy.MM.dd HH:mm:ss`
+- `yyyy/MM/dd HH:mm:ss`
+- `yyyy/M/d HH:mm`
+- `yyyy-M-d HH:mm`
+- `yyyy/M/d HH:mm:ss`
+- `yyyy-M-d HH:mm:ss`
+- `yyyyMMddHHmmss`
+
+Default: `yyyy-MM-dd HH:mm:ss`
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/ConnectorCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/ConnectorCommonOptions.java
index 2c1fc27887..49b89b45a6 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/ConnectorCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/ConnectorCommonOptions.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.options.table.CatalogOptions;
import org.apache.seatunnel.api.options.table.ColumnOptions;
import org.apache.seatunnel.api.options.table.ConstraintKeyOptions;
import org.apache.seatunnel.api.options.table.FieldOptions;
+import org.apache.seatunnel.api.options.table.FormatOptions;
import org.apache.seatunnel.api.options.table.PrimaryKeyOptions;
import org.apache.seatunnel.api.options.table.TableIdentifierOptions;
import org.apache.seatunnel.api.options.table.TableSchemaOptions;
@@ -38,6 +39,7 @@ public class ConnectorCommonOptions
ColumnOptions,
PrimaryKeyOptions,
ConstraintKeyOptions,
+ FormatOptions,
Serializable {
public static Option<String> PLUGIN_NAME =
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/table/FormatOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/table/FormatOptions.java
new file mode 100644
index 0000000000..5c0647a353
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/table/FormatOptions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.options.table;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
+import org.apache.seatunnel.common.utils.TimeUtils;
+
+public interface FormatOptions {
+ Option<DateUtils.Formatter> DATE_FORMAT_LEGACY =
+ Options.key("date_format")
+ .enumType(DateUtils.Formatter.class)
+ .defaultValue(DateUtils.Formatter.YYYY_MM_DD)
+ .withDescription("Date format");
+
+ Option<DateTimeUtils.Formatter> DATETIME_FORMAT_LEGACY =
+ Options.key("datetime_format")
+ .enumType(DateTimeUtils.Formatter.class)
+ .defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
+ .withDescription("Datetime format");
+
+ Option<TimeUtils.Formatter> TIME_FORMAT_LEGACY =
+ Options.key("time_format")
+ .enumType(TimeUtils.Formatter.class)
+ .defaultValue(TimeUtils.Formatter.HH_MM_SS)
+ .withDescription("Time format");
+
+ // Not used yet. Reserved for future use to support custom date/time
format strings.
+ Option<String> DATE_FORMAT =
+ Options.key("date_format")
+ .stringType()
+ .defaultValue("yyyy-MM-dd")
+ .withDescription(
+ "Date format string (e.g. 'yyyy-MM-dd'). "
+ + "Must match one of the predefined values
in the Formatter enum.");
+
+ Option<String> DATETIME_FORMAT =
+ Options.key("datetime_format")
+ .stringType()
+ .defaultValue("yyyy-MM-dd HH:mm:ss")
+ .withDescription(
+ "Datetime format string (e.g. 'yyyy-MM-dd
HH:mm:ss'). "
+ + "Must match one of the predefined values
in the Formatter enum.");
+
+ // Not used yet. Reserved for future use to support custom date/time
format strings.
+ Option<String> TIME_FORMAT =
+ Options.key("time_format")
+ .stringType()
+ .defaultValue("HH:mm:ss")
+ .withDescription(
+ "Time format string (e.g. 'HH:mm:ss'). "
+ + "Must match one of the predefined values
in the Formatter enum.");
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 653f0c1268..64bc0538c4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -115,22 +115,22 @@ public class BaseFileSinkConfig implements
DelimiterConfig, Serializable {
this.filenameExtension =
config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key());
}
- if (config.hasPath(FileBaseSinkOptions.DATE_FORMAT.key())) {
+ if (config.hasPath(FileBaseSinkOptions.DATE_FORMAT_LEGACY.key())) {
dateFormat =
DateUtils.Formatter.parse(
-
config.getString(FileBaseSinkOptions.DATE_FORMAT.key()));
+
config.getString(FileBaseSinkOptions.DATE_FORMAT_LEGACY.key()));
}
- if (config.hasPath(FileBaseSinkOptions.DATETIME_FORMAT.key())) {
+ if (config.hasPath(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY.key())) {
datetimeFormat =
DateTimeUtils.Formatter.parse(
-
config.getString(FileBaseSinkOptions.DATETIME_FORMAT.key()));
+
config.getString(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY.key()));
}
- if (config.hasPath(FileBaseSinkOptions.TIME_FORMAT.key())) {
+ if (config.hasPath(FileBaseSinkOptions.TIME_FORMAT_LEGACY.key())) {
timeFormat =
TimeUtils.Formatter.parse(
-
config.getString(FileBaseSinkOptions.TIME_FORMAT.key()));
+
config.getString(FileBaseSinkOptions.TIME_FORMAT_LEGACY.key()));
}
if (config.hasPath(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key())) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
index 176cdb53bb..2f6728c6d4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
@@ -20,9 +20,6 @@ package org.apache.seatunnel.connectors.seatunnel.file.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
-import org.apache.seatunnel.common.utils.DateTimeUtils;
-import org.apache.seatunnel.common.utils.DateUtils;
-import org.apache.seatunnel.common.utils.TimeUtils;
import java.util.List;
@@ -47,24 +44,6 @@ public class FileBaseOptions extends ConnectorCommonOptions {
.defaultValue("UTF-8")
.withDescription("The encoding of the file, e.g. UTF-8,
ISO-8859-1....");
- public static final Option<DateUtils.Formatter> DATE_FORMAT =
- Options.key("date_format")
- .enumType(DateUtils.Formatter.class)
- .defaultValue(DateUtils.Formatter.YYYY_MM_DD)
- .withDescription("Date format");
-
- public static final Option<DateTimeUtils.Formatter> DATETIME_FORMAT =
- Options.key("datetime_format")
- .enumType(DateTimeUtils.Formatter.class)
- .defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
- .withDescription("Datetime format");
-
- public static final Option<TimeUtils.Formatter> TIME_FORMAT =
- Options.key("time_format")
- .enumType(TimeUtils.Formatter.class)
- .defaultValue(TimeUtils.Formatter.HH_MM_SS)
- .withDescription("Time format");
-
public static final Option<Boolean> PARSE_PARTITION_FROM_PATH =
Options.key("parse_partition_from_path")
.booleanType()
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSinkOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSinkOptions.java
index e020b99e1c..1f385e54cf 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSinkOptions.java
@@ -21,9 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
-import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
-import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.format.csv.constant.CsvStringQuoteMode;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
@@ -97,24 +95,6 @@ public class FileBaseSinkOptions extends FileBaseOptions {
.defaultValue(CompressFormat.NONE)
.withDescription("Orc file supported compression");
- public static final Option<DateUtils.Formatter> DATE_FORMAT =
- Options.key("date_format")
- .enumType(DateUtils.Formatter.class)
- .defaultValue(DateUtils.Formatter.YYYY_MM_DD)
- .withDescription("Date format");
-
- public static final Option<DateTimeUtils.Formatter> DATETIME_FORMAT =
- Options.key("datetime_format")
- .enumType(DateTimeUtils.Formatter.class)
- .defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
- .withDescription("Datetime format");
-
- public static final Option<TimeUtils.Formatter> TIME_FORMAT =
- Options.key("time_format")
- .enumType(TimeUtils.Formatter.class)
- .defaultValue(TimeUtils.Formatter.HH_MM_SS)
- .withDescription("Time format");
-
public static final Option<String> FILE_PATH =
Options.key("path")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
index babecc069f..dc204ae62e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
@@ -57,10 +57,12 @@ import java.util.stream.Collectors;
@Slf4j
public class CsvReadStrategy extends AbstractReadStrategy {
private CsvDeserializationSchema deserializationSchema;
- private DateUtils.Formatter dateFormat =
FileBaseSourceOptions.DATE_FORMAT.defaultValue();
+ private DateUtils.Formatter dateFormat =
+ FileBaseSourceOptions.DATE_FORMAT_LEGACY.defaultValue();
private DateTimeUtils.Formatter datetimeFormat =
- FileBaseSourceOptions.DATETIME_FORMAT.defaultValue();
- private TimeUtils.Formatter timeFormat =
FileBaseSourceOptions.TIME_FORMAT.defaultValue();
+ FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.defaultValue();
+ private TimeUtils.Formatter timeFormat =
+ FileBaseSourceOptions.TIME_FORMAT_LEGACY.defaultValue();
private CompressFormat compressFormat =
FileBaseSourceOptions.COMPRESS_CODEC.defaultValue();
private CsvLineProcessor processor;
private int[] indexes;
@@ -264,20 +266,21 @@ public class CsvReadStrategy extends AbstractReadStrategy
{
}
private void initFormatter() {
- if (pluginConfig.hasPath(FileBaseSourceOptions.DATE_FORMAT.key())) {
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.DATE_FORMAT_LEGACY.key())) {
dateFormat =
DateUtils.Formatter.parse(
-
pluginConfig.getString(FileBaseSourceOptions.DATE_FORMAT.key()));
+
pluginConfig.getString(FileBaseSourceOptions.DATE_FORMAT_LEGACY.key()));
}
- if (pluginConfig.hasPath(FileBaseSourceOptions.DATETIME_FORMAT.key()))
{
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.key())) {
datetimeFormat =
DateTimeUtils.Formatter.parse(
-
pluginConfig.getString(FileBaseSourceOptions.DATETIME_FORMAT.key()));
+ pluginConfig.getString(
+
FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.key()));
}
- if (pluginConfig.hasPath(FileBaseSourceOptions.TIME_FORMAT.key())) {
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.TIME_FORMAT_LEGACY.key())) {
timeFormat =
TimeUtils.Formatter.parse(
-
pluginConfig.getString(FileBaseSourceOptions.TIME_FORMAT.key()));
+
pluginConfig.getString(FileBaseSourceOptions.TIME_FORMAT_LEGACY.key()));
}
if (pluginConfig.hasPath(FileBaseSourceOptions.COMPRESS_CODEC.key())) {
String compressCodec =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
index 7bbccf56f1..35106f9866 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
@@ -96,15 +96,17 @@ public class ExcelReadStrategy extends AbstractReadStrategy
{
"Skip the number of rows exceeds the maximum or minimum
limit of Sheet");
}
- if (pluginConfig.hasPath(FileBaseSourceOptions.DATE_FORMAT.key())) {
- dateFormatterPattern =
pluginConfig.getString(FileBaseSourceOptions.DATE_FORMAT.key());
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.DATE_FORMAT_LEGACY.key())) {
+ dateFormatterPattern =
+
pluginConfig.getString(FileBaseSourceOptions.DATE_FORMAT_LEGACY.key());
}
- if (pluginConfig.hasPath(FileBaseSourceOptions.DATETIME_FORMAT.key()))
{
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.key())) {
dateTimeFormatterPattern =
-
pluginConfig.getString(FileBaseSourceOptions.DATETIME_FORMAT.key());
+
pluginConfig.getString(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.key());
}
- if (pluginConfig.hasPath(FileBaseSourceOptions.TIME_FORMAT.key())) {
- timeFormatterPattern =
pluginConfig.getString(FileBaseSourceOptions.TIME_FORMAT.key());
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.TIME_FORMAT_LEGACY.key())) {
+ timeFormatterPattern =
+
pluginConfig.getString(FileBaseSourceOptions.TIME_FORMAT_LEGACY.key());
}
ExcelCellUtils excelCellUtils =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 9bf3d6afc4..9e72143b4c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -54,10 +54,12 @@ import java.util.Optional;
public class TextReadStrategy extends AbstractReadStrategy {
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private String fieldDelimiter =
FileBaseSourceOptions.FIELD_DELIMITER.defaultValue();
- private DateUtils.Formatter dateFormat =
FileBaseSourceOptions.DATE_FORMAT.defaultValue();
+ private DateUtils.Formatter dateFormat =
+ FileBaseSourceOptions.DATE_FORMAT_LEGACY.defaultValue();
private DateTimeUtils.Formatter datetimeFormat =
- FileBaseSourceOptions.DATETIME_FORMAT.defaultValue();
- private TimeUtils.Formatter timeFormat =
FileBaseSourceOptions.TIME_FORMAT.defaultValue();
+ FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.defaultValue();
+ private TimeUtils.Formatter timeFormat =
+ FileBaseSourceOptions.TIME_FORMAT_LEGACY.defaultValue();
private CompressFormat compressFormat =
FileBaseSourceOptions.COMPRESS_CODEC.defaultValue();
private TextLineSplitor textLineSplitor;
private int[] indexes;
@@ -223,20 +225,21 @@ public class TextReadStrategy extends
AbstractReadStrategy {
}
private void initFormatter() {
- if (pluginConfig.hasPath(FileBaseSourceOptions.DATE_FORMAT.key())) {
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.DATE_FORMAT_LEGACY.key())) {
dateFormat =
DateUtils.Formatter.parse(
-
pluginConfig.getString(FileBaseSourceOptions.DATE_FORMAT.key()));
+
pluginConfig.getString(FileBaseSourceOptions.DATE_FORMAT_LEGACY.key()));
}
- if (pluginConfig.hasPath(FileBaseSourceOptions.DATETIME_FORMAT.key()))
{
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.key())) {
datetimeFormat =
DateTimeUtils.Formatter.parse(
-
pluginConfig.getString(FileBaseSourceOptions.DATETIME_FORMAT.key()));
+ pluginConfig.getString(
+
FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.key()));
}
- if (pluginConfig.hasPath(FileBaseSourceOptions.TIME_FORMAT.key())) {
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.TIME_FORMAT_LEGACY.key())) {
timeFormat =
TimeUtils.Formatter.parse(
-
pluginConfig.getString(FileBaseSourceOptions.TIME_FORMAT.key()));
+
pluginConfig.getString(FileBaseSourceOptions.TIME_FORMAT_LEGACY.key()));
}
if (pluginConfig.hasPath(FileBaseSourceOptions.COMPRESS_CODEC.key())) {
String compressCodec =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
index 1503d8fbac..b9fa3d7776 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
@@ -282,13 +282,14 @@ public class XmlReadStrategy extends AbstractReadStrategy
{
this.dateFormat =
getComplexDateConfigValue(
- FileBaseSourceOptions.DATE_FORMAT,
DateUtils.Formatter::parse);
+ FileBaseSourceOptions.DATE_FORMAT_LEGACY,
DateUtils.Formatter::parse);
this.timeFormat =
getComplexDateConfigValue(
- FileBaseSourceOptions.TIME_FORMAT,
TimeUtils.Formatter::parse);
+ FileBaseSourceOptions.TIME_FORMAT_LEGACY,
TimeUtils.Formatter::parse);
this.datetimeFormat =
getComplexDateConfigValue(
- FileBaseSourceOptions.DATETIME_FORMAT,
DateTimeUtils.Formatter::parse);
+ FileBaseSourceOptions.DATETIME_FORMAT_LEGACY,
+ DateTimeUtils.Formatter::parse);
this.encoding =
ReadonlyConfig.fromConfig(pluginConfig)
.getOptional(FileBaseSourceOptions.ENCODING)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
index 3dd6a77019..bdf82d54f9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
@@ -100,9 +100,9 @@ public class CosFileSinkFactory implements TableSinkFactory
{
FileBaseSinkOptions.ENCODING)
.optional(FileBaseSinkOptions.SINK_COLUMNS)
.optional(FileBaseSinkOptions.IS_ENABLE_TRANSACTION)
- .optional(FileBaseSinkOptions.DATE_FORMAT)
- .optional(FileBaseSinkOptions.DATETIME_FORMAT)
- .optional(FileBaseSinkOptions.TIME_FORMAT)
+ .optional(FileBaseSinkOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSinkOptions.SINGLE_FILE_MODE)
.optional(FileBaseSinkOptions.BATCH_SIZE)
.optional(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
index 0e5a83f86a..a9a628017c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
@@ -81,9 +81,9 @@ public class CosFileSourceFactory implements
TableSourceFactory {
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
- .optional(FileBaseSourceOptions.DATE_FORMAT)
- .optional(FileBaseSourceOptions.DATETIME_FORMAT)
- .optional(FileBaseSourceOptions.TIME_FORMAT)
+ .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.FILE_FILTER_PATTERN)
.optional(FileBaseSourceOptions.COMPRESS_CODEC)
.optional(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index e2a6e8dda8..38d578fdff 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -113,9 +113,9 @@ public class FtpFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
FileBaseSinkOptions.ENCODING)
.optional(FileBaseSinkOptions.SINK_COLUMNS)
.optional(FileBaseSinkOptions.IS_ENABLE_TRANSACTION)
- .optional(FileBaseSinkOptions.DATE_FORMAT)
- .optional(FileBaseSinkOptions.DATETIME_FORMAT)
- .optional(FileBaseSinkOptions.TIME_FORMAT)
+ .optional(FileBaseSinkOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.TIME_FORMAT_LEGACY)
.optional(FtpFileSinkOptions.FTP_CONNECTION_MODE)
.optional(FileBaseSinkOptions.SINGLE_FILE_MODE)
.optional(FileBaseSinkOptions.BATCH_SIZE)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index 55ee74291f..c8696c7687 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -86,9 +86,9 @@ public class FtpFileSourceFactory implements
TableSourceFactory {
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
- .optional(FileBaseSourceOptions.DATE_FORMAT)
- .optional(FileBaseSourceOptions.DATETIME_FORMAT)
- .optional(FileBaseSourceOptions.TIME_FORMAT)
+ .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.FILE_FILTER_PATTERN)
.optional(FileBaseSourceOptions.COMPRESS_CODEC)
.optional(FtpFileSourceOptions.FTP_CONNECTION_MODE)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index 540bc8168e..e7ecd250d2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -96,9 +96,9 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
FileBaseSinkOptions.ENCODING)
.optional(FileBaseSinkOptions.SINK_COLUMNS)
.optional(FileBaseSinkOptions.IS_ENABLE_TRANSACTION)
- .optional(FileBaseSinkOptions.DATE_FORMAT)
- .optional(FileBaseSinkOptions.DATETIME_FORMAT)
- .optional(FileBaseSinkOptions.TIME_FORMAT)
+ .optional(FileBaseSinkOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSinkOptions.SINGLE_FILE_MODE)
.optional(FileBaseSinkOptions.BATCH_SIZE)
.optional(FileBaseSinkOptions.HDFS_SITE_PATH)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 95668144f9..3abf72c7c8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -73,9 +73,9 @@ public class HdfsFileSourceFactory implements
TableSourceFactory {
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
- .optional(FileBaseSourceOptions.DATE_FORMAT)
- .optional(FileBaseSourceOptions.DATETIME_FORMAT)
- .optional(FileBaseSourceOptions.TIME_FORMAT)
+ .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.FILE_FILTER_PATTERN)
.optional(FileBaseSourceOptions.COMPRESS_CODEC)
.optional(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
index 37cb9ccff2..41ced858e8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
@@ -100,9 +100,9 @@ public class OssFileSinkFactory implements TableSinkFactory
{
FileBaseSinkOptions.ENCODING)
.optional(FileBaseSinkOptions.SINK_COLUMNS)
.optional(FileBaseSinkOptions.IS_ENABLE_TRANSACTION)
- .optional(FileBaseSinkOptions.DATE_FORMAT)
- .optional(FileBaseSinkOptions.DATETIME_FORMAT)
- .optional(FileBaseSinkOptions.TIME_FORMAT)
+ .optional(FileBaseSinkOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSinkOptions.SINGLE_FILE_MODE)
.optional(FileBaseSinkOptions.BATCH_SIZE)
.optional(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
index 6a6c09913e..f826c8891b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
@@ -77,9 +77,9 @@ public class OssFileSourceFactory implements
TableSourceFactory {
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
- .optional(FileBaseSourceOptions.DATE_FORMAT)
- .optional(FileBaseSourceOptions.DATETIME_FORMAT)
- .optional(FileBaseSourceOptions.TIME_FORMAT)
+ .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.FILE_FILTER_PATTERN)
.optional(FileBaseSourceOptions.COMPRESS_CODEC)
.optional(FileBaseSourceOptions.NULL_FORMAT)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index d27171b86c..69abdf7fba 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -106,9 +106,9 @@ public class LocalFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
FileBaseSinkOptions.ENCODING)
.optional(FileBaseSinkOptions.SINK_COLUMNS)
.optional(FileBaseSinkOptions.IS_ENABLE_TRANSACTION)
- .optional(FileBaseSinkOptions.DATE_FORMAT)
- .optional(FileBaseSinkOptions.DATETIME_FORMAT)
- .optional(FileBaseSinkOptions.TIME_FORMAT)
+ .optional(FileBaseSinkOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSinkOptions.SINGLE_FILE_MODE)
.optional(FileBaseSinkOptions.BATCH_SIZE)
.optional(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index b88f624367..cb21ff360c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -84,9 +84,9 @@ public class LocalFileSourceFactory implements
TableSourceFactory {
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
- .optional(FileBaseSourceOptions.DATE_FORMAT)
- .optional(FileBaseSourceOptions.DATETIME_FORMAT)
- .optional(FileBaseSourceOptions.TIME_FORMAT)
+ .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.FILE_FILTER_PATTERN)
.optional(FileBaseSourceOptions.COMPRESS_CODEC)
.optional(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
index 54f7586f28..a1cf8354ae 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
@@ -83,9 +83,9 @@ public class ObsFileSinkFactory implements TableSinkFactory {
FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE)
.optional(FileBaseSinkOptions.SINK_COLUMNS)
.optional(FileBaseSinkOptions.IS_ENABLE_TRANSACTION)
- .optional(FileBaseSinkOptions.DATE_FORMAT)
- .optional(FileBaseSinkOptions.DATETIME_FORMAT)
- .optional(FileBaseSinkOptions.TIME_FORMAT)
+ .optional(FileBaseSinkOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSinkOptions.SINGLE_FILE_MODE)
.optional(FileBaseSinkOptions.BATCH_SIZE)
.optional(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
index 0da2d507b1..db010ec98f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
@@ -68,9 +68,9 @@ public class ObsFileSourceFactory implements
TableSourceFactory {
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
- .optional(FileBaseSourceOptions.DATE_FORMAT)
- .optional(FileBaseSourceOptions.DATETIME_FORMAT)
- .optional(FileBaseSourceOptions.TIME_FORMAT)
+ .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.NULL_FORMAT)
.optional(FileBaseSourceOptions.FILENAME_EXTENSION)
.optional(FileBaseSourceOptions.READ_COLUMNS)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 056c8767a9..ff7e7348a0 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -114,9 +114,9 @@ public class OssFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
FileBaseSinkOptions.ENCODING)
.optional(FileBaseSinkOptions.SINK_COLUMNS)
.optional(FileBaseSinkOptions.IS_ENABLE_TRANSACTION)
- .optional(FileBaseSinkOptions.DATE_FORMAT)
- .optional(FileBaseSinkOptions.DATETIME_FORMAT)
- .optional(FileBaseSinkOptions.TIME_FORMAT)
+ .optional(FileBaseSinkOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSinkOptions.SINGLE_FILE_MODE)
.optional(FileBaseSinkOptions.BATCH_SIZE)
.optional(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index eca8bfb908..4d06977035 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -87,9 +87,9 @@ public class OssFileSourceFactory implements
TableSourceFactory {
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
- .optional(FileBaseSourceOptions.DATE_FORMAT)
- .optional(FileBaseSourceOptions.DATETIME_FORMAT)
- .optional(FileBaseSourceOptions.TIME_FORMAT)
+ .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.FILE_FILTER_PATTERN)
.optional(FileBaseSourceOptions.COMPRESS_CODEC)
.optional(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index b052f97399..28c6390186 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -111,9 +111,9 @@ public class S3FileSinkFactory implements TableSinkFactory {
FileBaseSinkOptions.ENCODING)
.optional(FileBaseSinkOptions.SINK_COLUMNS)
.optional(FileBaseSinkOptions.IS_ENABLE_TRANSACTION)
- .optional(FileBaseSinkOptions.DATE_FORMAT)
- .optional(FileBaseSinkOptions.DATETIME_FORMAT)
- .optional(FileBaseSinkOptions.TIME_FORMAT)
+ .optional(FileBaseSinkOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSinkOptions.SINGLE_FILE_MODE)
.optional(FileBaseSinkOptions.BATCH_SIZE)
.optional(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index 0be2ff50b5..a172b9126c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -91,9 +91,9 @@ public class S3FileSourceFactory implements
TableSourceFactory {
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
- .optional(FileBaseSourceOptions.DATE_FORMAT)
- .optional(FileBaseSourceOptions.DATETIME_FORMAT)
- .optional(FileBaseSourceOptions.TIME_FORMAT)
+ .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.FILE_FILTER_PATTERN)
.optional(FileBaseSourceOptions.COMPRESS_CODEC)
.optional(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
index 701a76dbb6..eb68f31158 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
@@ -111,9 +111,9 @@ public class SftpFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
FileBaseSinkOptions.ENCODING)
.optional(FileBaseSinkOptions.SINK_COLUMNS)
.optional(FileBaseSinkOptions.IS_ENABLE_TRANSACTION)
- .optional(FileBaseSinkOptions.DATE_FORMAT)
- .optional(FileBaseSinkOptions.DATETIME_FORMAT)
- .optional(FileBaseSinkOptions.TIME_FORMAT)
+ .optional(FileBaseSinkOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSinkOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSinkOptions.SINGLE_FILE_MODE)
.optional(FileBaseSinkOptions.BATCH_SIZE)
.optional(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index 7355b48af5..5391aa5d05 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -80,9 +80,9 @@ public class SftpFileSourceFactory implements
TableSourceFactory {
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
- .optional(FileBaseSourceOptions.DATE_FORMAT)
- .optional(FileBaseSourceOptions.DATETIME_FORMAT)
- .optional(FileBaseSourceOptions.TIME_FORMAT)
+ .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
+ .optional(FileBaseSourceOptions.TIME_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.FILE_FILTER_PATTERN)
.optional(FileBaseSourceOptions.NULL_FORMAT)
.optional(FileBaseSourceOptions.FILENAME_EXTENSION)
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
index a89d4bc27b..1f1e7c0f1d 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.options.table.FormatOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
@@ -52,6 +53,7 @@ public class MaxcomputeSinkFactory implements
TableSinkFactory {
MaxcomputeSinkOptions.DATA_SAVE_MODE,
MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
MaxcomputeSinkOptions.CUSTOM_SQL,
+ FormatOptions.DATETIME_FORMAT,
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
index 423ead236f..e800a504c8 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.table.FormatOptions;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -25,6 +26,7 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.FormatterContext;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
@@ -45,6 +47,7 @@ public class MaxcomputeWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
private final TableTunnel.UploadSession session;
private final TableSchema tableSchema;
private final SeaTunnelRowType rowType;
+ private final FormatterContext formatterContext;
public MaxcomputeWriter(ReadonlyConfig readonlyConfig, SeaTunnelRowType
rowType) {
try {
@@ -66,6 +69,10 @@ public class MaxcomputeWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME));
}
+
+ this.formatterContext =
+ new
FormatterContext(readonlyConfig.get(FormatOptions.DATETIME_FORMAT));
+
this.recordWriter = session.openBufferedWriter();
log.info("open record writer success");
} catch (Exception e) {
@@ -78,7 +85,7 @@ public class MaxcomputeWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
public void write(SeaTunnelRow seaTunnelRow) throws IOException {
Record record =
MaxcomputeTypeMapper.getMaxcomputeRowData(
- seaTunnelRow, this.tableSchema, this.rowType);
+ seaTunnelRow, this.tableSchema, this.rowType,
formatterContext);
recordWriter.write(record);
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/FormatterContext.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/FormatterContext.java
new file mode 100644
index 0000000000..e9033b9ab4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/FormatterContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
+
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+
+import java.time.LocalDateTime;
+
+public class FormatterContext {
+ private final DateTimeUtils.Formatter localDateTimeFormat;
+
+ public FormatterContext(String localDateTimeFormat) {
+ this.localDateTimeFormat =
DateTimeUtils.Formatter.parse(localDateTimeFormat);
+ }
+
+ public boolean isDateTimeType(Object field) {
+ return field instanceof LocalDateTime;
+ }
+
+ public String formatDateTime(Object field) {
+ if (field instanceof LocalDateTime) {
+ return this.format(((LocalDateTime) field));
+ }
+ throw CommonError.illegalArgument(
+ field.getClass().getName(),
+ "Cannot format the given value: not a LocalDateTime
instance.");
+ }
+
+ private String format(LocalDateTime localDateTime) {
+ return DateTimeUtils.toString(localDateTime, localDateTimeFormat);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
index 5e489bac7c..6b50f50431 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -69,7 +69,10 @@ public class MaxcomputeTypeMapper implements Serializable {
}
public static Record getMaxcomputeRowData(
- SeaTunnelRow seaTunnelRow, TableSchema tableSchema,
SeaTunnelRowType rowType) {
+ SeaTunnelRow seaTunnelRow,
+ TableSchema tableSchema,
+ SeaTunnelRowType rowType,
+ FormatterContext formatterContext) {
ArrayRecord arrayRecord = new ArrayRecord(tableSchema);
for (int i = 0; i < seaTunnelRow.getFields().length; i++) {
String fieldName = rowType.getFieldName(i);
@@ -84,7 +87,8 @@ public class MaxcomputeTypeMapper implements Serializable {
arrayRecord.set(
tableSchema.getColumnIndex(fieldName),
- resolveObject2Maxcompute(seaTunnelRow.getField(i),
column.getTypeInfo()));
+ resolveObject2Maxcompute(
+ seaTunnelRow.getField(i), column.getTypeInfo(),
formatterContext));
}
return arrayRecord;
}
@@ -212,7 +216,8 @@ public class MaxcomputeTypeMapper implements Serializable {
}
}
- private static Object resolveObject2Maxcompute(Object field, TypeInfo
typeInfo) {
+ private static Object resolveObject2Maxcompute(
+ Object field, TypeInfo typeInfo, FormatterContext
formatterContext) {
if (field == null) {
return null;
}
@@ -243,15 +248,19 @@ public class MaxcomputeTypeMapper implements Serializable
{
origDataMap.forEach(
(key, value) ->
dataMap.put(
- resolveObject2Maxcompute(key,
keyTypeInfo),
- resolveObject2Maxcompute(value,
valueTypeInfo)));
+ resolveObject2Maxcompute(
+ key, keyTypeInfo,
formatterContext),
+ resolveObject2Maxcompute(
+ value, valueTypeInfo,
formatterContext)));
return origDataMap;
case STRUCT:
Object[] fields = ((SeaTunnelRow) field).getFields();
List<TypeInfo> typeInfos = ((StructTypeInfo)
typeInfo).getFieldTypeInfos();
ArrayList<Object> origStruct = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
- origStruct.add(resolveObject2Maxcompute(fields[i],
typeInfos.get(i)));
+ origStruct.add(
+ resolveObject2Maxcompute(
+ fields[i], typeInfos.get(i),
formatterContext));
}
return new SimpleStruct((StructTypeInfo) typeInfo, origStruct);
case TINYINT:
@@ -272,6 +281,9 @@ public class MaxcomputeTypeMapper implements Serializable {
case CHAR:
return new Char((String) field);
case STRING:
+ if (formatterContext.isDateTimeType(field)) {
+ return formatterContext.formatDateTime(field);
+ }
case JSON:
if (field instanceof byte[]) {
return new String((byte[]) field);
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
index 6ed5f79818..15d971f15e 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.FormatterContext;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
import org.junit.jupiter.api.Assertions;
@@ -34,9 +35,15 @@ import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import lombok.SneakyThrows;
+import java.sql.Timestamp;
import java.time.LocalDate;
public class BasicTypeToOdpsTypeTest {
+ public static FormatterContext defaultFormatterContext =
+ new FormatterContext("yyyy-MM-dd HH:mm:ss");
+
+ public static FormatterContext customFormatterContext =
+ new FormatterContext("yyyy-MM-dd HH:mm:ss.SSSSSS");
private static void testType(
String fieldName,
@@ -57,7 +64,8 @@ public class BasicTypeToOdpsTypeTest {
SeaTunnelRow seaTunnelRow =
MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo);
Record tRecord =
- MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow,
tableSchema, typeInfo);
+ MaxcomputeTypeMapper.getMaxcomputeRowData(
+ seaTunnelRow, tableSchema, typeInfo,
defaultFormatterContext);
for (int i = 0; i < tRecord.getColumns().length; i++) {
Assertions.assertEquals(record.get(i), tRecord.get(i));
@@ -111,4 +119,55 @@ public class BasicTypeToOdpsTypeTest {
void testDATE_TYPE_2_DATE() {
testType("DATE_TYPE_2_DATE", LocalTimeType.LOCAL_DATE_TYPE,
OdpsType.DATE, LocalDate.now());
}
+
+ @SneakyThrows
+ @Test
+ void testLOCAL_DATETIME_2_STRING() {
+ testTypeWithDifferentInputAndOutput(
+ "LOCAL_DATETIME_2_STRING",
+ OdpsType.TIMESTAMP,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ OdpsType.STRING,
+ Timestamp.valueOf("2025-01-01 00:00:00"),
+ "2025-01-01 00:00:00",
+ defaultFormatterContext);
+
+ testTypeWithDifferentInputAndOutput(
+ "LOCAL_DATETIME_2_STRING",
+ OdpsType.TIMESTAMP,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ OdpsType.STRING,
+ Timestamp.valueOf("2025-01-01 00:00:00"),
+ "2025-01-01 00:00:00.000000",
+ customFormatterContext);
+ }
+
+ private static void testTypeWithDifferentInputAndOutput(
+ String fieldName,
+ OdpsType inputOdpsType,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ OdpsType outputOdpsType,
+ Object inputObject,
+ Object expectedObject,
+ FormatterContext formatterContext) {
+ Column inputColumn = new Column(fieldName, inputOdpsType);
+ ArrayRecord inputRecord = new ArrayRecord(new Column[] {inputColumn});
+ inputRecord.set(fieldName, inputObject);
+
+ SeaTunnelRowType typeInfo =
+ new SeaTunnelRowType(
+ new String[] {fieldName}, new SeaTunnelDataType<?>[]
{seaTunnelDataType});
+
+ SeaTunnelRow seaTunnelRow =
MaxcomputeTypeMapper.getSeaTunnelRowData(inputRecord, typeInfo);
+
+ Column outputColumn = new Column(fieldName, outputOdpsType);
+ TableSchema outputSchema = new TableSchema();
+ outputSchema.addColumn(outputColumn);
+
+ Record finalOutputRecord =
+ MaxcomputeTypeMapper.getMaxcomputeRowData(
+ seaTunnelRow, outputSchema, typeInfo,
formatterContext);
+
+ Assertions.assertEquals(expectedObject,
finalOutputRecord.get(fieldName));
+ }
}