This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4a1dd33621d [fix](iceberg) fix the iceberg timstamp_ntz write schema
and values bug. (#51384)
4a1dd33621d is described below
commit 4a1dd33621de69333a34466d703988f16bf435fc
Author: kang <[email protected]>
AuthorDate: Sun Jun 15 09:41:28 2025 +0800
[fix](iceberg) fix the iceberg timstamp_ntz write schema and values bug.
(#51384)
### What problem does this PR solve?
This PR is mainly to fix the time zone issue of doris writing in Iceberg
in the data lake scenario. In Iceberg, there are two time zone types:
timestamp_ltz and timestamp_ntz. Currently, there is no distinction
between these two scenarios for doris, resulting in different spark
query results for data written by doris under the timestamp_ntz type.
This PR is to solve this problem.
---
be/src/pipeline/exec/result_sink_operator.h | 4 ++
.../serde/data_type_datetimev2_serde.cpp | 7 ++-
.../format/table/iceberg/arrow_schema_util.cpp | 4 +-
be/src/vec/runtime/vparquet_transformer.cpp | 45 +++++++-------
be/src/vec/runtime/vparquet_transformer.h | 25 ++++----
.../writer/iceberg/viceberg_partition_writer.cpp | 8 +--
be/src/vec/sink/writer/vfile_result_writer.cpp | 5 +-
be/src/vec/sink/writer/vhive_partition_writer.cpp | 8 +--
.../create_preinstalled_scripts/iceberg/run12.sql | 13 ++++
.../fileformat/ParquetFileFormatProperties.java | 12 ++++
.../ParquetFileFormatPropertiesTest.java | 11 ++++
gensrc/thrift/DataSinks.thrift | 5 ++
.../write/test_iceberg_write_timestamp_ntz.out | Bin 0 -> 257 bytes
.../write/test_iceberg_write_timestamp_ntz.groovy | 68 +++++++++++++++++++++
14 files changed, 170 insertions(+), 45 deletions(-)
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 6b0bea25fa7..cf2a928787d 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -48,6 +48,7 @@ struct ResultFileOptions {
TParquetCompressionType::type parquet_commpression_type;
TParquetVersion::type parquet_version;
bool parquert_disable_dictionary;
+ bool enable_int96_timestamps;
//note: use outfile with parquet format, have deprecated 9:schema and
10:file_properties
//But in order to consider the compatibility when upgrading, so add a bool
to check
//Now the code version is 1.1.2, so when the version is after 1.2, could
remove this code.
@@ -104,6 +105,9 @@ struct ResultFileOptions {
if (t_opt.__isset.parquet_version) {
parquet_version = t_opt.parquet_version;
}
+ if (t_opt.__isset.enable_int96_timestamps) {
+ enable_int96_timestamps = t_opt.enable_int96_timestamps;
+ }
if (t_opt.__isset.orc_schema) {
orc_schema = t_opt.orc_schema;
}
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
index 06bd9635f95..21c69d58622 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -18,6 +18,7 @@
#include "data_type_datetimev2_serde.h"
#include <arrow/builder.h>
+#include <cctz/time_zone.h>
#include <chrono> // IWYU pragma: keep
#include <cstdint>
@@ -96,6 +97,10 @@ Status DataTypeDateTimeV2SerDe::write_column_to_arrow(const
IColumn& column,
const cctz::time_zone&
ctz) const {
const auto& col_data = static_cast<const
ColumnDateTimeV2&>(column).get_data();
auto& timestamp_builder =
assert_cast<arrow::TimestampBuilder&>(*array_builder);
+ std::shared_ptr<arrow::TimestampType> timestamp_type =
+
std::static_pointer_cast<arrow::TimestampType>(array_builder->type());
+ const std::string& timezone = timestamp_type->timezone();
+ const cctz::time_zone& real_ctz = timezone == "" ? cctz::utc_time_zone() :
ctz;
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
RETURN_IF_ERROR(checkArrowStatus(timestamp_builder.AppendNull(),
column.get_name(),
@@ -104,7 +109,7 @@ Status DataTypeDateTimeV2SerDe::write_column_to_arrow(const
IColumn& column,
int64_t timestamp = 0;
DateV2Value<DateTimeV2ValueType> datetime_val =
binary_cast<UInt64,
DateV2Value<DateTimeV2ValueType>>(col_data[i]);
- datetime_val.unix_timestamp(×tamp, ctz);
+ datetime_val.unix_timestamp(×tamp, real_ctz);
if (scale > 3) {
uint32_t microsecond = datetime_val.microsecond();
diff --git a/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
index 77ac705076b..241560120f7 100644
--- a/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
+++ b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
@@ -70,7 +70,9 @@ Status ArrowSchemaUtil::convert_to(const
iceberg::NestedField& field,
break;
case iceberg::TypeID::TIMESTAMP: {
- arrow_type =
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
+ iceberg::TimestampType* t_type =
static_cast<iceberg::TimestampType*>(field.field_type());
+ std::string real_tz = t_type->should_adjust_to_utc() ? timezone : "";
+ arrow_type =
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, real_tz);
break;
}
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp
b/be/src/vec/runtime/vparquet_transformer.cpp
index 2569e529b48..cfd556d70e5 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -177,18 +177,17 @@ void
ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
}
}
-VParquetTransformer::VParquetTransformer(
- RuntimeState* state, doris::io::FileWriter* file_writer,
- const VExprContextSPtrs& output_vexpr_ctxs, std::vector<std::string>
column_names,
- TParquetCompressionType::type compression_type, bool
parquet_disable_dictionary,
- TParquetVersion::type parquet_version, bool output_object_data,
- const std::string* iceberg_schema_json, const iceberg::Schema*
iceberg_schema)
+VParquetTransformer::VParquetTransformer(RuntimeState* state,
doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs&
output_vexpr_ctxs,
+ std::vector<std::string> column_names,
+ bool output_object_data,
+ const ParquetFileOptions&
parquet_options,
+ const std::string*
iceberg_schema_json,
+ const iceberg::Schema* iceberg_schema)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_column_names(std::move(column_names)),
_parquet_schemas(nullptr),
- _compression_type(compression_type),
- _parquet_disable_dictionary(parquet_disable_dictionary),
- _parquet_version(parquet_version),
+ _parquet_options(parquet_options),
_iceberg_schema_json(iceberg_schema_json),
_iceberg_schema(iceberg_schema) {
_outstream = std::shared_ptr<ParquetOutputStream>(new
ParquetOutputStream(file_writer));
@@ -197,16 +196,12 @@ VParquetTransformer::VParquetTransformer(
VParquetTransformer::VParquetTransformer(RuntimeState* state,
doris::io::FileWriter* file_writer,
const VExprContextSPtrs&
output_vexpr_ctxs,
const std::vector<TParquetSchema>&
parquet_schemas,
- TParquetCompressionType::type
compression_type,
- bool parquet_disable_dictionary,
- TParquetVersion::type parquet_version,
bool output_object_data,
+ const ParquetFileOptions&
parquet_options,
const std::string*
iceberg_schema_json)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_parquet_schemas(&parquet_schemas),
- _compression_type(compression_type),
- _parquet_disable_dictionary(parquet_disable_dictionary),
- _parquet_version(parquet_version),
+ _parquet_options(parquet_options),
_iceberg_schema_json(iceberg_schema_json) {
_iceberg_schema = nullptr;
_outstream = std::shared_ptr<ParquetOutputStream>(new
ParquetOutputStream(file_writer));
@@ -215,10 +210,12 @@ VParquetTransformer::VParquetTransformer(RuntimeState*
state, doris::io::FileWri
Status VParquetTransformer::_parse_properties() {
try {
arrow::MemoryPool* pool = ExecEnv::GetInstance()->arrow_memory_pool();
+
+ //build parquet writer properties
parquet::WriterProperties::Builder builder;
- ParquetBuildHelper::build_compression_type(builder, _compression_type);
- ParquetBuildHelper::build_version(builder, _parquet_version);
- if (_parquet_disable_dictionary) {
+ ParquetBuildHelper::build_compression_type(builder,
_parquet_options.compression_type);
+ ParquetBuildHelper::build_version(builder,
_parquet_options.parquet_version);
+ if (_parquet_options.parquet_disable_dictionary) {
builder.disable_dictionary();
} else {
builder.enable_dictionary();
@@ -228,10 +225,14 @@ Status VParquetTransformer::_parse_properties() {
builder.max_row_group_length(std::numeric_limits<int64_t>::max());
builder.memory_pool(pool);
_parquet_writer_properties = builder.build();
- _arrow_properties = parquet::ArrowWriterProperties::Builder()
- .enable_deprecated_int96_timestamps()
- ->store_schema()
- ->build();
+
+ //build arrow writer properties
+ parquet::ArrowWriterProperties::Builder arrow_builder;
+ if (_parquet_options.enable_int96_timestamps) {
+ arrow_builder.enable_deprecated_int96_timestamps();
+ }
+ arrow_builder.store_schema();
+ _arrow_properties = arrow_builder.build();
} catch (const parquet::ParquetException& e) {
return Status::InternalError("parquet writer parse properties error:
{}", e.what());
}
diff --git a/be/src/vec/runtime/vparquet_transformer.h
b/be/src/vec/runtime/vparquet_transformer.h
index bb552a06136..f958b18c4c0 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -85,23 +85,28 @@ public:
const TParquetVersion::type& parquet_version);
};
+struct ParquetFileOptions {
+ TParquetCompressionType::type compression_type;
+ TParquetVersion::type parquet_version;
+ bool parquet_disable_dictionary = false;
+ bool enable_int96_timestamps = false;
+};
+
// a wrapper of parquet output stream
class VParquetTransformer final : public VFileFormatTransformer {
public:
VParquetTransformer(RuntimeState* state, doris::io::FileWriter*
file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
- std::vector<std::string> column_names,
- TParquetCompressionType::type compression_type,
- bool parquet_disable_dictionary, TParquetVersion::type
parquet_version,
- bool output_object_data, const std::string*
iceberg_schema_json = nullptr,
+ std::vector<std::string> column_names, bool
output_object_data,
+ const ParquetFileOptions& parquet_options,
+ const std::string* iceberg_schema_json = nullptr,
const iceberg::Schema* iceberg_schema = nullptr);
VParquetTransformer(RuntimeState* state, doris::io::FileWriter*
file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
- const std::vector<TParquetSchema>& parquet_schemas,
- TParquetCompressionType::type compression_type,
- bool parquet_disable_dictionary, TParquetVersion::type
parquet_version,
- bool output_object_data, const std::string*
iceberg_schema_json = nullptr);
+ const std::vector<TParquetSchema>& parquet_schemas,
bool output_object_data,
+ const ParquetFileOptions& parquet_options,
+ const std::string* iceberg_schema_json = nullptr);
~VParquetTransformer() override = default;
@@ -126,9 +131,7 @@ private:
std::vector<std::string> _column_names;
const std::vector<TParquetSchema>* _parquet_schemas = nullptr;
- const TParquetCompressionType::type _compression_type;
- const bool _parquet_disable_dictionary;
- const TParquetVersion::type _parquet_version;
+ const ParquetFileOptions _parquet_options;
const std::string* _iceberg_schema_json;
uint64_t _write_size = 0;
const iceberg::Schema* _iceberg_schema;
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index aeaa81d9995..933c8d3f562 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -64,7 +64,6 @@ Status VIcebergPartitionWriter::open(RuntimeState* state,
RuntimeProfile* profil
switch (_file_format_type) {
case TFileFormatType::FORMAT_PARQUET: {
- bool parquet_disable_dictionary = false;
TParquetCompressionType::type parquet_compression_type;
switch (_compress_type) {
case TFileCompressType::PLAIN: {
@@ -84,10 +83,11 @@ Status VIcebergPartitionWriter::open(RuntimeState* state,
RuntimeProfile* profil
to_string(_compress_type));
}
}
+ ParquetFileOptions parquet_options = {parquet_compression_type,
+ TParquetVersion::PARQUET_1_0,
false, false};
_file_format_transformer.reset(new VParquetTransformer(
- state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names,
- parquet_compression_type, parquet_disable_dictionary,
TParquetVersion::PARQUET_1_0,
- false, _iceberg_schema_json, &_schema));
+ state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names, false,
+ parquet_options, _iceberg_schema_json, &_schema));
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index bb3fdd42696..fc286d7e8a4 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -135,8 +135,9 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
case TFileFormatType::FORMAT_PARQUET:
_vfile_writer.reset(new VParquetTransformer(
_state, _file_writer_impl.get(), _vec_output_expr_ctxs,
_file_opts->parquet_schemas,
- _file_opts->parquet_commpression_type,
_file_opts->parquert_disable_dictionary,
- _file_opts->parquet_version, _output_object_data));
+ _output_object_data,
+ {_file_opts->parquet_commpression_type,
_file_opts->parquet_version,
+ _file_opts->parquert_disable_dictionary,
_file_opts->enable_int96_timestamps}));
break;
case TFileFormatType::FORMAT_ORC:
_vfile_writer.reset(new VOrcTransformer(
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index b93303dff03..70f588c4c7e 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -70,7 +70,6 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
switch (_file_format_type) {
case TFileFormatType::FORMAT_PARQUET: {
- bool parquet_disable_dictionary = false;
TParquetCompressionType::type parquet_compression_type;
switch (_hive_compress_type) {
case TFileCompressType::PLAIN: {
@@ -90,10 +89,11 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
to_string(_hive_compress_type));
}
}
+ ParquetFileOptions parquet_options = {parquet_compression_type,
+ TParquetVersion::PARQUET_1_0,
false, true};
_file_format_transformer = std::make_unique<VParquetTransformer>(
- state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names,
- parquet_compression_type, parquet_disable_dictionary,
TParquetVersion::PARQUET_1_0,
- false);
+ state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names, false,
+ parquet_options);
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run12.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run12.sql
new file mode 100644
index 00000000000..53778efd09c
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run12.sql
@@ -0,0 +1,13 @@
+
+use demo.test_db;
+
+
+drop table if exists t_ntz_doris;
+CREATE TABLE t_ntz_doris (
+ col TIMESTAMP_NTZ)
+USING iceberg;
+
+drop table if exists t_tz_doris;
+CREATE TABLE t_tz_doris (
+ col TIMESTAMP)
+USING iceberg;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
index 18d1484e596..9c4d5f2ae49 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
@@ -36,6 +36,7 @@ import java.util.Map.Entry;
public class ParquetFileFormatProperties extends FileFormatProperties {
public static final String PARQUET_DISABLE_DICTIONARY =
"disable_dictionary";
public static final String PARQUET_VERSION = "version";
+ public static final String ENABLE_INT96_TIMESTAMPS =
"enable_int96_timestamps";
public static final String PARQUET_PROP_PREFIX = "parquet.";
public static final Logger LOG =
LogManager.getLogger(ParquetFileFormatProperties.class);
@@ -60,6 +61,7 @@ public class ParquetFileFormatProperties extends
FileFormatProperties {
private TParquetCompressionType parquetCompressionType =
TParquetCompressionType.SNAPPY;
private boolean parquetDisableDictionary = false;
private TParquetVersion parquetVersion = TParquetVersion.PARQUET_1_0;
+ private boolean enableInt96Timestamps = true;
public ParquetFileFormatProperties() {
super(TFileFormatType.FORMAT_PARQUET,
FileFormatProperties.FORMAT_PARQUET);
@@ -82,6 +84,11 @@ public class ParquetFileFormatProperties extends
FileFormatProperties {
}
}
+ //save the enable int96 timestamp property
+ if (formatProperties.containsKey(ENABLE_INT96_TIMESTAMPS)) {
+ this.enableInt96Timestamps =
Boolean.valueOf(formatProperties.get(ENABLE_INT96_TIMESTAMPS)).booleanValue();
+ }
+
// save all parquet prefix property
Iterator<Entry<String, String>> iter =
formatProperties.entrySet().iterator();
while (iter.hasNext()) {
@@ -109,6 +116,7 @@ public class ParquetFileFormatProperties extends
FileFormatProperties {
sinkOptions.setParquetCompressionType(parquetCompressionType);
sinkOptions.setParquetDisableDictionary(parquetDisableDictionary);
sinkOptions.setParquetVersion(parquetVersion);
+ sinkOptions.setEnableInt96Timestamps(enableInt96Timestamps);
}
@Override
@@ -126,4 +134,8 @@ public class ParquetFileFormatProperties extends
FileFormatProperties {
public boolean isParquetDisableDictionary() {
return parquetDisableDictionary;
}
+
+ public boolean isEnableInt96Timestamps() {
+ return enableInt96Timestamps;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
index 754d857613f..370e4965765 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
@@ -94,6 +94,17 @@ public class ParquetFileFormatPropertiesTest {
Assert.assertFalse(parquetFileFormatProperties.isParquetDisableDictionary());
}
+ @Test
+ public void testEnableInt96Timestamps() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("enable_int96_timestamps", "true");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+
Assert.assertTrue(parquetFileFormatProperties.isEnableInt96Timestamps());
+ properties.put("enable_int96_timestamps", "false");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+
Assert.assertFalse(parquetFileFormatProperties.isEnableInt96Timestamps());
+ }
+
@Test
public void testParquetVersion() {
Map<String, String> properties = new HashMap<>();
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 8899889e506..17572a1bca6 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -141,6 +141,11 @@ struct TResultFileSinkOptions {
// orc_writer_version = 1 means doris FE is higher than version 2.1.5
// orc_writer_version = 0 means doris FE is less than or equal to version
2.1.5
20: optional i64 orc_writer_version;
+
+ //iceberg write sink use int64
+ //hive write sink use int96
+ //export data to file use by user define properties
+ 21: optional bool enable_int96_timestamps
}
struct TMemoryScratchSink {
diff --git
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.out
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.out
new file mode 100644
index 00000000000..1980a167f36
Binary files /dev/null and
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.out
differ
diff --git
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.groovy
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.groovy
new file mode 100644
index 00000000000..33418c0e247
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.groovy
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_write_timestamp_ntz",
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+
+ try {
+
+ String rest_port =
context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port =
context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "iceberg_timestamp_ntz_test"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch ${catalog_name};"""
+ logger.info("switched to catalog " + catalog_name)
+ sql """ use test_db;"""
+
+ sql """INSERT INTO t_ntz_doris VALUES ('2025-02-07 20:12:00');"""
+ sql """INSERT INTO t_tz_doris VALUES ('2025-02-07 20:12:01');"""
+
+
+ sql "set time_zone = 'Asia/Shanghai'"
+ qt_timestamp_ntz """select * from t_ntz_doris;"""
+ qt_timestamp_tz """select * from t_tz_doris;"""
+
+ sql "set time_zone = 'Europe/Tirane'"
+ qt_timestamp_ntz2 """select * from t_ntz_doris;"""
+ qt_timestamp_tz2 """select * from t_tz_doris;"""
+
+ // sql """drop catalog if exists ${catalog_name}"""
+
+ } finally {
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]