This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new f326f6a6b ORC-1388: [C++] Support schema evolution from decimal to
timestamp/string group
f326f6a6b is described below
commit f326f6a6b627f43677f5e4c245f19bf461e46820
Author: ffacs <[email protected]>
AuthorDate: Tue Jan 30 13:52:22 2024 +0800
ORC-1388: [C++] Support schema evolution from decimal to timestamp/string
group
### What changes were proposed in this pull request?
Support conversion from
{decimal}
to
{string, char, varchar, timestamp, timestamp_instant}
### Why are the changes needed?
Support schema evolution at cpp side.
### How was this patch tested?
UT passed.
### Was this patch authored or co-authored using generative AI tooling?
NO
Closes #1761 from ffacs/ORC-1388.
Authored-by: ffacs <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
---
c++/src/ConvertColumnReader.cc | 138 +++++++++++++++++++++++++++++++---
c++/src/SchemaEvolution.cc | 3 +-
c++/test/TestConvertColumnReader.cc | 143 ++++++++++++++++++++++++++++++++++++
c++/test/TestSchemaEvolution.cc | 16 ++++
4 files changed, 287 insertions(+), 13 deletions(-)
diff --git a/c++/src/ConvertColumnReader.cc b/c++/src/ConvertColumnReader.cc
index a1e29ba58..c139cfa9d 100644
--- a/c++/src/ConvertColumnReader.cc
+++ b/c++/src/ConvertColumnReader.cc
@@ -593,6 +593,107 @@ namespace orc {
int32_t toScale;
};
+ template <typename FileTypeBatch>
+ class DecimalToTimestampColumnReader : public ConvertToTimestampColumnReader
{
+ public:
+ DecimalToTimestampColumnReader(const Type& _readType, const Type& fileType,
+ StripeStreams& stripe, bool
_throwOnOverflow)
+ : ConvertToTimestampColumnReader(_readType, fileType, stripe,
_throwOnOverflow),
+ precision(static_cast<int32_t>(fileType.getPrecision())),
+ scale(static_cast<int32_t>(fileType.getScale())) {}
+
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override {
+ ConvertColumnReader::next(rowBatch, numValues, notNull);
+ const auto& srcBatch = *SafeCastBatchTo<const
FileTypeBatch*>(data.get());
+ auto& dstBatch = *SafeCastBatchTo<TimestampVectorBatch*>(&rowBatch);
+ for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
+ if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+ convertDecimalToTimestamp(dstBatch, i, srcBatch);
+ }
+ }
+ }
+
+ private:
+ void convertDecimalToTimestamp(TimestampVectorBatch& dstBatch, uint64_t
idx,
+ const FileTypeBatch& srcBatch) {
+ constexpr int SecondToNanoFactor = 9;
+ // Following constant comes from java.time.Instant
+ // '-1000000000-01-01T00:00Z'
+ constexpr int64_t MIN_EPOCH_SECONDS = -31557014167219200L;
+ // '1000000000-12-31T23:59:59.999999999Z'
+ constexpr int64_t MAX_EPOCH_SECONDS = 31556889864403199L;
+ // dummy variable, there's no risk of overflow
+ bool overflow = false;
+
+ Int128 i128(srcBatch.values[idx]);
+ Int128 integerPortion = scaleDownInt128ByPowerOfTen(i128, scale);
+ if (integerPortion < MIN_EPOCH_SECONDS || integerPortion >
MAX_EPOCH_SECONDS) {
+ handleOverflow<Decimal, int64_t>(dstBatch, idx, throwOnOverflow);
+ return;
+ }
+ i128 -= scaleUpInt128ByPowerOfTen(integerPortion, scale, overflow);
+ Int128 fractionPortion = std::move(i128);
+ if (scale < SecondToNanoFactor) {
+ fractionPortion =
+ scaleUpInt128ByPowerOfTen(fractionPortion, SecondToNanoFactor -
scale, overflow);
+ } else {
+ fractionPortion = scaleDownInt128ByPowerOfTen(fractionPortion, scale -
SecondToNanoFactor);
+ }
+ if (fractionPortion < 0) {
+ fractionPortion += 1e9;
+ integerPortion -= 1;
+ }
+ // line 630 has guaranteed toLong() will not overflow
+ dstBatch.data[idx] = integerPortion.toLong();
+ dstBatch.nanoseconds[idx] = fractionPortion.toLong();
+
+ if (needConvertTimezone) {
+ dstBatch.data[idx] = readerTimezone.convertFromUTC(dstBatch.data[idx]);
+ }
+ }
+
+ const int32_t precision;
+ const int32_t scale;
+ };
+
+ template <typename FileTypeBatch>
+ class DecimalToStringVariantColumnReader : public
ConvertToStringVariantColumnReader {
+ public:
+ DecimalToStringVariantColumnReader(const Type& _readType, const Type&
fileType,
+ StripeStreams& stripe, bool
_throwOnOverflow)
+ : ConvertToStringVariantColumnReader(_readType, fileType, stripe,
_throwOnOverflow),
+ scale(fileType.getScale()) {}
+
+ uint64_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t
numValues) override {
+ uint64_t size = 0;
+ strBuffer.resize(numValues);
+ const auto& srcBatch = *SafeCastBatchTo<const
FileTypeBatch*>(data.get());
+ if (readType.getKind() == STRING) {
+ for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
+ if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+ strBuffer[i] = Int128(srcBatch.values[i]).toDecimalString(scale,
true);
+ size += strBuffer[i].size();
+ }
+ }
+ } else {
+ const auto maxLength = readType.getMaximumLength();
+ for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
+ if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+ strBuffer[i] = Int128(srcBatch.values[i]).toDecimalString(scale,
true);
+ }
+ if (strBuffer[i].size() > maxLength) {
+ strBuffer[i].resize(maxLength);
+ }
+ size += strBuffer[i].size();
+ }
+ }
+ return size;
+ }
+
+ private:
+ const int32_t scale;
+ };
+
#define DEFINE_NUMERIC_CONVERT_READER(FROM, TO, TYPE) \
using FROM##To##TO##ColumnReader = \
NumericConvertColumnReader<FROM##VectorBatch, TO##VectorBatch, TYPE>;
@@ -621,6 +722,14 @@ namespace orc {
using Decimal128##To##TO##ColumnReader = \
DecimalConvertColumnReader<Decimal128VectorBatch, TO##VectorBatch>;
+#define DEFINE_DECIMAL_CONVERT_TO_TIMESTAMP_READER
\
+ using Decimal64ToTimestampColumnReader =
DecimalToTimestampColumnReader<Decimal64VectorBatch>; \
+ using Decimal128ToTimestampColumnReader =
DecimalToTimestampColumnReader<Decimal128VectorBatch>;
+
+#define DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(TO)
\
+ using Decimal64To##TO##ColumnReader =
DecimalToStringVariantColumnReader<Decimal64VectorBatch>; \
+ using Decimal128To##TO##ColumnReader =
DecimalToStringVariantColumnReader<Decimal128VectorBatch>;
+
DEFINE_NUMERIC_CONVERT_READER(Boolean, Byte, int8_t)
DEFINE_NUMERIC_CONVERT_READER(Boolean, Short, int16_t)
DEFINE_NUMERIC_CONVERT_READER(Boolean, Int, int32_t)
@@ -720,6 +829,11 @@ namespace orc {
DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal64)
DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal128)
+ DEFINE_DECIMAL_CONVERT_TO_TIMESTAMP_READER
+ DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(String)
+ DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(Char)
+ DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(Varchar)
+
#define CREATE_READER(NAME) \
return std::make_unique<NAME>(_readType, fileType, stripe, throwOnOverflow);
@@ -935,13 +1049,6 @@ namespace orc {
CASE_EXCEPTION
}
}
- case STRING:
- case BINARY:
- case TIMESTAMP:
- case LIST:
- case MAP:
- case STRUCT:
- case UNION:
case DECIMAL: {
switch (_readType.getKind()) {
CASE_CREATE_FROM_DECIMAL_READER(BOOLEAN, Boolean)
@@ -951,6 +1058,11 @@ namespace orc {
CASE_CREATE_FROM_DECIMAL_READER(LONG, Long)
CASE_CREATE_FROM_DECIMAL_READER(FLOAT, Float)
CASE_CREATE_FROM_DECIMAL_READER(DOUBLE, Double)
+ CASE_CREATE_FROM_DECIMAL_READER(STRING, String)
+ CASE_CREATE_FROM_DECIMAL_READER(CHAR, Char)
+ CASE_CREATE_FROM_DECIMAL_READER(VARCHAR, Varchar)
+ CASE_CREATE_FROM_DECIMAL_READER(TIMESTAMP, Timestamp)
+ CASE_CREATE_FROM_DECIMAL_READER(TIMESTAMP_INSTANT, Timestamp)
case DECIMAL: {
if (isDecimal64(fileType)) {
if (isDecimal64(_readType)) {
@@ -966,11 +1078,6 @@ namespace orc {
}
}
}
- case STRING:
- case CHAR:
- case VARCHAR:
- case TIMESTAMP:
- case TIMESTAMP_INSTANT:
case BINARY:
case LIST:
case MAP:
@@ -980,6 +1087,13 @@ namespace orc {
CASE_EXCEPTION
}
}
+ case STRING:
+ case BINARY:
+ case TIMESTAMP:
+ case LIST:
+ case MAP:
+ case STRUCT:
+ case UNION:
case DATE:
case VARCHAR:
case CHAR:
diff --git a/c++/src/SchemaEvolution.cc b/c++/src/SchemaEvolution.cc
index b8c4fd404..dc30e6118 100644
--- a/c++/src/SchemaEvolution.cc
+++ b/c++/src/SchemaEvolution.cc
@@ -99,7 +99,8 @@ namespace orc {
break;
}
case DECIMAL: {
- ret.isValid = ret.needConvert = isNumeric(readType);
+ ret.isValid = ret.needConvert =
+ isNumeric(readType) || isStringVariant(readType) ||
isTimestamp(readType);
break;
}
case STRING:
diff --git a/c++/test/TestConvertColumnReader.cc
b/c++/test/TestConvertColumnReader.cc
index 4aabce047..83798289d 100644
--- a/c++/test/TestConvertColumnReader.cc
+++ b/c++/test/TestConvertColumnReader.cc
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+#include "Timezone.hh"
#include "orc/Type.hh"
#include "wrap/gtest-wrapper.h"
@@ -672,4 +673,146 @@ namespace orc {
}
}
+ TEST(ConvertColumnReader, TestConvertDecimalToTimestamp) {
+ constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024;
+ constexpr int TEST_CASES = 1024;
+ std::string writerTimezoneName = "America/New_York";
+ std::string readerTimezoneName = "Australia/Sydney";
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ std::unique_ptr<Type> fileType(
+
Type::buildTypeFromString("struct<c1:decimal(14,4),c2:decimal(25,10)>"));
+ std::shared_ptr<Type> readType(
+ Type::buildTypeFromString("struct<c1:timestamp,c2:timestamp with local
time zone>"));
+ WriterOptions options;
+ options.setUseTightNumericVector(true);
+ options.setTimezoneName(writerTimezoneName);
+ auto writer = createWriter(*fileType, &memStream, options);
+ auto batch = writer->createRowBatch(TEST_CASES);
+ auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+ auto& c1 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[0]);
+ auto& c2 = dynamic_cast<Decimal128VectorBatch&>(*structBatch->fields[1]);
+
+ auto convertToSeconds = [](const Timezone& writerTimezone, const
std::string& date) {
+ tm timeStruct;
+ if (strptime(date.c_str(), "%Y-%m-%d %H:%M:%S", &timeStruct) == nullptr)
{
+ throw TimezoneError("bad time " + date);
+ }
+ return writerTimezone.convertFromUTC(timegm(&timeStruct));
+ };
+
+ std::vector<std::string> timeStrings;
+ for (int i = 0; i < TEST_CASES; i++) {
+ int64_t year = 1960 + (i / 12);
+ int64_t month = i % 12 + 1;
+ int64_t day = 27;
+ std::string others = "23:45:56";
+ std::stringstream ss;
+ ss << year << "-";
+ ss << std::setfill('0') << std::setw(2) << month << "-" << day << " " <<
others;
+ timeStrings.push_back(ss.str());
+ }
+ std::vector<int64_t> ts[2];
+ for (auto& time : timeStrings) {
+ ts[0].emplace_back(convertToSeconds(getTimezoneByName("GMT"), time));
+
ts[1].emplace_back(convertToSeconds(getTimezoneByName(writerTimezoneName),
time));
+ }
+ bool overflow = false;
+
+ for (int i = 0; i < TEST_CASES; i++) {
+ c1.values[i] = ts[0][i] * 10000 + 1234;
+ c2.values[i] = scaleUpInt128ByPowerOfTen(Int128(ts[1][i]), 10, overflow)
+=
+ Int128("1234567895");
+ assert(!overflow);
+ }
+
+ structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES;
+ structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false;
+ writer->add(*batch);
+ writer->close();
+ auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength());
+ auto pool = getDefaultPool();
+ auto reader = createReader(*pool, std::move(inStream));
+ RowReaderOptions rowReaderOptions;
+ rowReaderOptions.setUseTightNumericVector(true);
+ rowReaderOptions.setReadType(readType);
+ rowReaderOptions.setTimezoneName(readerTimezoneName);
+ auto rowReader = reader->createRowReader(rowReaderOptions);
+ auto readBatch = rowReader->createRowBatch(TEST_CASES);
+ EXPECT_EQ(true, rowReader->next(*readBatch));
+
+ auto& readSturctBatch = dynamic_cast<StructVectorBatch&>(*readBatch);
+ auto& readC1 =
dynamic_cast<TimestampVectorBatch&>(*readSturctBatch.fields[0]);
+ auto& readC2 =
dynamic_cast<TimestampVectorBatch&>(*readSturctBatch.fields[1]);
+ for (int i = 0; i < TEST_CASES; i++) {
+ size_t idx = static_cast<size_t>(i);
+ EXPECT_TRUE(readC1.notNull[idx]) << i;
+ EXPECT_TRUE(readC2.notNull[idx]) << i;
+
EXPECT_EQ(getTimezoneByName(readerTimezoneName).convertToUTC(readC1.data[i]),
ts[0][i]);
+ EXPECT_TRUE(readC1.nanoseconds[i] == 123400000);
+ EXPECT_EQ(readC2.data[i], ts[1][i]);
+ if (readC2.data[i] < 0) {
+ EXPECT_EQ(readC2.nanoseconds[i], 123456790) << timeStrings[i];
+ } else {
+ EXPECT_EQ(readC2.nanoseconds[i], 123456789) << timeStrings[i];
+ }
+ }
+ }
+
+ TEST(ConvertColumnReader, TestConvertDecimalToStringVariant) {
+ constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024;
+ constexpr int TEST_CASES = 1024;
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ std::unique_ptr<Type> fileType(
+
Type::buildTypeFromString("struct<c1:decimal(14,3),c2:decimal(14,3),c3:decimal(14,3)>"));
+ std::shared_ptr<Type> readType(
+
Type::buildTypeFromString("struct<c1:char(5),c2:varchar(5),c3:string>"));
+ WriterOptions options;
+ auto writer = createWriter(*fileType, &memStream, options);
+ auto batch = writer->createRowBatch(TEST_CASES);
+ auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+ auto& c1 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[0]);
+ auto& c2 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[1]);
+ auto& c3 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[2]);
+
+ for (int i = 0; i < TEST_CASES; i++) {
+ c1.values[i] = i * 1000 + 123;
+ c2.values[i] = i * 1000 + 456;
+ c3.values[i] = i * 1000 + 789;
+ }
+ structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES;
+ structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false;
+ writer->add(*batch);
+ writer->close();
+ auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength());
+ auto pool = getDefaultPool();
+ auto reader = createReader(*pool, std::move(inStream));
+ RowReaderOptions rowReaderOptions;
+ rowReaderOptions.setUseTightNumericVector(true);
+ rowReaderOptions.setReadType(readType);
+ auto rowReader = reader->createRowReader(rowReaderOptions);
+ auto readBatch = rowReader->createRowBatch(TEST_CASES);
+ EXPECT_EQ(true, rowReader->next(*readBatch));
+
+ auto& readSturctBatch = dynamic_cast<StructVectorBatch&>(*readBatch);
+ auto& readC1 =
dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[0]);
+ auto& readC2 =
dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[1]);
+ auto& readC3 =
dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[2]);
+ for (int i = 0; i < TEST_CASES; i++) {
+ if (i < 10) {
+ EXPECT_EQ(std::to_string(i) + ".123", std::string(readC1.data[i],
readC1.length[i]));
+ EXPECT_EQ(std::to_string(i) + ".456", std::string(readC2.data[i],
readC2.length[i]));
+ } else if (i >= 10 && i < 100) {
+ EXPECT_EQ(std::to_string(i) + ".12", std::string(readC1.data[i],
readC1.length[i]));
+ EXPECT_EQ(std::to_string(i) + ".45", std::string(readC2.data[i],
readC2.length[i]));
+ } else if (i >= 100 && i < 1000) {
+ EXPECT_EQ(std::to_string(i) + ".1", std::string(readC1.data[i],
readC1.length[i]));
+ EXPECT_EQ(std::to_string(i) + ".4", std::string(readC2.data[i],
readC2.length[i]));
+ } else {
+ EXPECT_EQ(std::to_string(i) + ".", std::string(readC1.data[i],
readC1.length[i]));
+ EXPECT_EQ(std::to_string(i) + ".", std::string(readC2.data[i],
readC2.length[i]));
+ }
+ EXPECT_EQ(std::to_string(i) + ".789", std::string(readC3.data[i],
readC3.length[i]));
+ }
+ }
+
} // namespace orc
diff --git a/c++/test/TestSchemaEvolution.cc b/c++/test/TestSchemaEvolution.cc
index f57e40086..c52ba009f 100644
--- a/c++/test/TestSchemaEvolution.cc
+++ b/c++/test/TestSchemaEvolution.cc
@@ -132,6 +132,22 @@ namespace orc {
}
}
+ // conversion from decimal to string/char/varchar
+ for (size_t i = 12; i <= 13; i++) {
+ for (size_t j = 7; j <= 11; j++) {
+ canConvert[i][j] = true;
+ needConvert[i][j] = true;
+ }
+ }
+
+ // conversion from decimal to timestamp
+ for (size_t i = 12; i <= 13; i++) {
+ for (size_t j = 14; j <= 15; j++) {
+ canConvert[i][j] = true;
+ needConvert[i][j] = true;
+ }
+ }
+
for (size_t i = 0; i < typesSize; i++) {
for (size_t j = 0; j < typesSize; j++) {
testConvertReader(types[i], types[j], canConvert[i][j],
needConvert[i][j]);