This is an automated email from the ASF dual-hosted git repository.
loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c2cfb8705a [CH] Support native base85codec (#9421)
c2cfb8705a is described below
commit c2cfb8705a418a09db3f781edbe0f5857894588e
Author: Shuai li <[email protected]>
AuthorDate: Fri Apr 25 17:02:44 2025 +0800
[CH] Support native base85codec (#9421)
---
.../gluten/vectorized/DeltaWriterJNIWrapper.java | 13 --
.../gluten/sql/shims/delta32/Delta32Shims.scala | 7 --
.../execution/DeletionVectorWriteTransformer.scala | 16 +--
cpp-ch/local-engine/Common/Base85Codec.cpp | 140 +++++++++++++++++++++
cpp-ch/local-engine/Common/Base85Codec.h | 55 ++++++++
.../Storages/SubstraitSource/Delta/DeltaUtil.cpp | 80 ------------
.../Storages/SubstraitSource/Delta/DeltaUtil.h | 51 --------
.../Storages/SubstraitSource/Delta/DeltaWriter.cpp | 104 ++++++++-------
.../Storages/SubstraitSource/Delta/DeltaWriter.h | 25 +---
cpp-ch/local-engine/local_engine_jni.cpp | 10 --
.../tests/gtest_clickhouse_roaring_bitmap.cpp | 16 +++
11 files changed, 273 insertions(+), 244 deletions(-)
diff --git
a/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
b/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
index ada976668c..1dfa67764d 100644
---
a/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
+++
b/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.vectorized;
-import org.apache.spark.sql.execution.DeletionVectorWriteTransformer;
public class DeltaWriterJNIWrapper {
@@ -24,21 +23,9 @@ public class DeltaWriterJNIWrapper {
// utility class
}
- public static native void registerNativeReference();
-
public static native long createDeletionVectorWriter(String tablePath, int
prefix_length, long packingTargetSize);
public static native void deletionVectorWrite(long writer_address, long
block_address);
public static native long deletionVectorWriteFinalize(long writer_address);
-
- // call from native
- public static String encodeUUID(String uuid, String randomPrefix) {
- return DeletionVectorWriteTransformer.encodeUUID(uuid, randomPrefix);
- }
-
- // call from native
- public static String decodeUUID(String encodedUuid) {
- return DeletionVectorWriteTransformer.decodeUUID(encodedUuid);
- }
}
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
index 0244bf3ffd..52da7b7883 100644
---
a/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
@@ -19,10 +19,7 @@ package org.apache.gluten.sql.shims.delta32
import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.{DeltaExpressionExtensionTransformer,
ExpressionExtensionTrait}
import org.apache.gluten.sql.shims.DeltaShims
-import org.apache.gluten.vectorized.DeltaWriterJNIWrapper
-import org.apache.spark.SparkContext
-import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.util.JsonUtils
@@ -44,10 +41,6 @@ class Delta32Shims extends DeltaShims {
DeltaOptimizedWriterTransformer.from(plan)
}
- override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
- DeltaWriterJNIWrapper.registerNativeReference()
- }
-
override def registerExpressionExtension(): Unit = {
ExpressionExtensionTrait.registerExpressionExtension(DeltaExpressionExtensionTransformer())
}
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
index 2715eff63e..56a756b012 100644
---
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
@@ -30,7 +30,7 @@ import
org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.commands.DeletionVectorResult
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
-import org.apache.spark.sql.delta.util.{Codec, Utils => DeltaUtils}
+import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.spark.sql.execution.datasources.CallTransformer
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -38,7 +38,6 @@ import org.apache.spark.util.Utils
import org.apache.hadoop.fs.Path
-import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
case class DeletionVectorWriteTransformer(
@@ -113,19 +112,6 @@ object DeletionVectorWriteTransformer {
StructField.apply("maxRowIndex", LongType, nullable = true)
))
- def encodeUUID(uuid: String, randomPrefix: String): String = {
- val uuidData = Codec.Base85Codec.encodeUUID(UUID.fromString(uuid))
- // This should always be true and we are relying on it for separating out
the
- // prefix again later without having to spend an extra character as a
separator.
- assert(uuidData.length == 20)
- // uuidData
- s"$randomPrefix$uuidData"
- }
-
- def decodeUUID(encodedUuid: String): String = {
- Codec.Base85Codec.decodeUUID(encodedUuid).toString
- }
-
def replace(
aggregated: DataFrame,
tablePath: Path,
diff --git a/cpp-ch/local-engine/Common/Base85Codec.cpp
b/cpp-ch/local-engine/Common/Base85Codec.cpp
new file mode 100644
index 0000000000..688c2413f1
--- /dev/null
+++ b/cpp-ch/local-engine/Common/Base85Codec.cpp
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Base85Codec.h"
+
+#include <IO/ReadBufferFromString.h>
+#include <IO/ReadHelpers.h>
+#include <IO/WriteHelpers.h>
+
+
+namespace local_engine
+{
+String uuidToByteBuffer(const DB::UUID & uuid)
+{
+ const UInt128 under_type = uuid.toUnderType();
+ long low = under_type.items[0];
+ long high = under_type.items[1];
+
+ String result(16, '\0');
+
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+ const Int64 low_swap = __builtin_bswap64(low);
+ const Int64 high_swap = __builtin_bswap64(high);
+ memcpy(result.data(), &low_swap, 8);
+ memcpy(result.data() + 8, &high_swap, 8);
+#else
+ memcpy(result.data(), &high, 8);
+ memcpy(result.data() + 8, &low, 8);
+#endif
+
+ return result;
+}
+
+DB::UUID uuidFromByteBuffer(const String & buffer)
+{
+ chassert(buffer.size() >= 16);
+ DB::ReadBufferFromString buf(buffer);
+ Int64 lowBits;
+ Int64 highBits;
+ DB::readBinaryBigEndian(lowBits, buf);
+ DB::readBinaryBigEndian(highBits, buf);
+
+ DB::UUID uuid;
+ uuid.toUnderType().items[0] = lowBits;
+ uuid.toUnderType().items[1] = highBits;
+ return uuid;
+}
+
+
+String Base85Codec::encodeUUID(const DB::UUID & uuid)
+{
+ const String blocks = uuidToByteBuffer(uuid);
+ return encodeBlocks(blocks);
+}
+
+DB::UUID Base85Codec::decodeUUID(const String & encoded)
+{
+ const String blocks = decodeBlocks(encoded);
+ return uuidFromByteBuffer(blocks);
+}
+
+String Base85Codec::encodeBlocks(const String & blocks)
+{
+ chassert(blocks.size() % 4 == 0);
+ auto numBlocks = blocks.size() / 4;
+ // Every 4 byte block gets encoded into 5 bytes/chars
+ const auto outputLength = numBlocks * 5;
+ String output(outputLength, '\0');
+ size_t outputIndex = 0;
+
+ DB::ReadBufferFromString rb(blocks);
+ while (!rb.eof())
+ {
+ Int32 readInt;
+ DB::readBinaryBigEndian(readInt, rb);
+ Int64 sum = readInt & 0x00000000ffffffffL;
+ output[outputIndex] = ENCODE_MAP[static_cast<Int32>(sum /
BASE_4TH_POWER)];
+ sum %= BASE_4TH_POWER;
+
+ output[outputIndex + 1] = ENCODE_MAP[static_cast<Int32>(sum /
BASE_3RD_POWER)];
+ sum %= BASE_3RD_POWER;
+ output[outputIndex + 2] = ENCODE_MAP[static_cast<Int32>(sum /
BASE_2ND_POWER)];
+ sum %= BASE_2ND_POWER;
+ output[outputIndex + 3] = ENCODE_MAP[static_cast<Int32>(sum / BASE)];
+ output[outputIndex + 4] = ENCODE_MAP[static_cast<Int32>(sum % BASE)];
+ outputIndex += 5;
+ }
+
+ return output;
+}
+
+String Base85Codec::decodeBlocks(const String & encoded)
+{
+ chassert(encoded.size() % 5 == 0);
+ String result(encoded.size() / 5 * 4, '\0');
+
+ // A mechanism to detect invalid characters in the input while decoding,
that only has a
+ // single conditional at the very end, instead of branching for every
character.
+ Int32 canary = 0;
+ auto decodeInputChar = [&encoded, &canary](Int32 i) -> Int64
+ {
+ const auto c = encoded[i];
+
+ canary |= c; // non-ascii char has bits outside of ASCII_BITMASK
+ const auto b = DECODE_MAP[c & ASCII_BITMASK];
+ canary |= b; // invalid char maps to -1, which has bits outside
ASCII_BITMASK
+ return static_cast<Int64>(b);
+ };
+
+ Int32 inputIndex = 0;
+ DB::WriteBufferFromString buf(result);
+ while (buf.hasPendingData())
+ {
+ Int64 sum = 0L;
+ sum += decodeInputChar(inputIndex) * BASE_4TH_POWER;
+ sum += decodeInputChar(inputIndex + 1) * BASE_3RD_POWER;
+ sum += decodeInputChar(inputIndex + 2) * BASE_2ND_POWER;
+ sum += decodeInputChar(inputIndex + 3) * BASE;
+ sum += decodeInputChar(inputIndex + 4);
+ DB::writeBinaryBigEndian(static_cast<Int32>(sum), buf);
+ inputIndex += 5;
+ }
+ buf.finalize();
+ return result;
+}
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/Base85Codec.h
b/cpp-ch/local-engine/Common/Base85Codec.h
new file mode 100644
index 0000000000..c2e47c41d5
--- /dev/null
+++ b/cpp-ch/local-engine/Common/Base85Codec.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <Core/Types.h>
+#include <Core/Types_fwd.h>
+
+namespace local_engine
+{
+
+static constexpr char * ENCODE_MAP =
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#";
+static constexpr Int64 BASE = 85L;
+static constexpr Int64 BASE_2ND_POWER = 7225L; // 85^2
+static constexpr Int64 BASE_3RD_POWER = 614125L; // 85^3
+static constexpr Int64 BASE_4TH_POWER = 52200625L; // 85^4
+static constexpr Int32 ASCII_BITMASK = 0x7F;
+
+static String generateDecodeMap()
+{
+ // The bitmask is the same as largest possible value, so the length of the
array must
+ // be one greater.
+ String result(ASCII_BITMASK + 1, 0xFF);
+ for (UInt8 i = 0; ENCODE_MAP[i] != '\0'; ++i)
+ result[ENCODE_MAP[i]] = i;
+
+ return result;
+}
+
+static const String DECODE_MAP = generateDecodeMap();
+
+class Base85Codec
+{
+public:
+ static String encodeUUID(const DB::UUID & uuid);
+ static DB::UUID decodeUUID(const String & encoded);
+
+private:
+ static String encodeBlocks(const String & blocks);
+ static String decodeBlocks(const String & encoded);
+};
+};
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp
deleted file mode 100644
index 725e3446a2..0000000000
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "DeltaUtil.h"
-
-#include <Compression/CompressedReadBuffer.h>
-#include <Core/Block.h>
-#include <IO/ReadBuffer.h>
-#include <jni/jni_common.h>
-#include <Common/JNIUtils.h>
-
-namespace local_engine::delta
-{
-
-jclass DeltaUtil::delta_jni_class = nullptr;
-jmethodID DeltaUtil::delta_jni_encode_uuid = nullptr;
-jmethodID DeltaUtil::delta_jni_decode_uuid = nullptr;
-
-void DeltaUtil::initJavaCallerReference(JNIEnv * env)
-{
- delta_jni_class = CreateGlobalClassReference(env,
"Lorg/apache/gluten/vectorized/DeltaWriterJNIWrapper;");
- delta_jni_encode_uuid
- = GetStaticMethodID(env, delta_jni_class, "encodeUUID",
"(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;");
- delta_jni_decode_uuid = GetStaticMethodID(env, delta_jni_class,
"decodeUUID", "(Ljava/lang/String;)Ljava/lang/String;");
-}
-
-void DeltaUtil::releaseJavaCallerReference(JNIEnv * env)
-{
- if (delta_jni_class)
- env->DeleteGlobalRef(delta_jni_class);
-}
-
-String DeltaUtil::encodeUUID(String uuid, String prefix)
-{
- GET_JNIENV(env)
- jstring jUuid = env->NewStringUTF(uuid.c_str());
- jstring jRandomPrefix = env->NewStringUTF(prefix.c_str());
-
- // Call the static Java method
- jstring jResult = (jstring)env->CallStaticObjectMethod(delta_jni_class,
delta_jni_encode_uuid, jUuid, jRandomPrefix);
- const char * resultCStr = env->GetStringUTFChars(jResult, nullptr);
- std::string encode(resultCStr);
- env->ReleaseStringUTFChars(jResult, resultCStr);
- env->DeleteLocalRef(jUuid);
- env->DeleteLocalRef(jRandomPrefix);
- env->DeleteLocalRef(jResult);
- CLEAN_JNIENV
- return encode;
-}
-
-String DeltaUtil::decodeUUID(String encodedUuid)
-{
- GET_JNIENV(env)
- jstring j_encoded_uuid = env->NewStringUTF(encodedUuid.c_str());
- jstring jResult = (jstring)env->CallStaticObjectMethod(delta_jni_class,
delta_jni_decode_uuid, j_encoded_uuid);
- const char * resultCStr = env->GetStringUTFChars(jResult, nullptr);
- std::string decode_uuid(resultCStr);
-
- env->ReleaseStringUTFChars(jResult, resultCStr);
- env->DeleteLocalRef(j_encoded_uuid);
- env->DeleteLocalRef(jResult);
- CLEAN_JNIENV
- return decode_uuid;
-}
-
-}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h
deleted file mode 100644
index d4f80b7d05..0000000000
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <jni.h>
-#include <jni/jni_common.h>
-
-
-namespace local_engine::delta
-{
-struct Codec
-{
- struct Base85Codec
- {
- static constexpr Int32 ENCODED_UUID_LENGTH = 20;
- };
-};
-
-
-static constexpr String UUID_DV_MARKER = "u";
-
-class DeltaUtil
-{
-public:
- static void initJavaCallerReference(JNIEnv * env);
- static void releaseJavaCallerReference(JNIEnv * env);
-
- static String encodeUUID(String uuid, String prefix);
- static String decodeUUID(String encodedUuid);
-
-private:
- static jclass delta_jni_class;
- static jmethodID delta_jni_encode_uuid;
- static jmethodID delta_jni_decode_uuid;
-};
-
-}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
index 2997e990b5..a074b5876b 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
@@ -31,9 +31,9 @@
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Storages/Output/WriteBufferBuilder.h>
-#include <Storages/SubstraitSource/Delta/DeltaUtil.h>
#include <rapidjson/document.h>
#include <Poco/URI.h>
+#include <Common/Base85Codec.h>
namespace local_engine::delta
{
@@ -50,6 +50,33 @@ String getRandomPrefix(const size_t & length)
return res;
}
+DB::ColumnTuple::MutablePtr createDeletionVectorDescriptorColumn()
+{
+ DB::MutableColumns dv_descriptor_mutable_columns;
+ dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); //
storageType
+ dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); //
pathOrInlineDv
+
dv_descriptor_mutable_columns.emplace_back(DB::ColumnNullable::create(DB::ColumnInt32::create(),
DB::ColumnUInt8::create())); // offset
+ dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt32::create()); //
sizeInBytes
+ dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt64::create()); //
cardinality
+ dv_descriptor_mutable_columns.emplace_back(
+ DB::ColumnNullable::create(DB::ColumnInt64::create(),
DB::ColumnUInt8::create())); // maxRowIndex
+
+ return DB::ColumnTuple::create(std::move(dv_descriptor_mutable_columns));
+}
+
+DB::Tuple createDeletionVectorDescriptorField(
+ const String & path_or_inline_dv, const Int32 & offset, const Int32 &
size_in_bytes, const Int64 & cardinality)
+{
+ DB::Tuple tuple;
+ tuple.emplace_back(UUID_DV_MARKER); // storageType
+ tuple.emplace_back(path_or_inline_dv); // pathOrInlineDv
+ tuple.emplace_back(offset); // offset
+ tuple.emplace_back(size_in_bytes); // sizeInBytes
+ tuple.emplace_back(cardinality); // cardinality
+ tuple.emplace_back(DB::Field{}); // maxRowIndex
+ return tuple;
+}
+
DB::DataTypePtr getDeletionVectorType()
{
DB::DataTypes dv_descriptor_types;
@@ -80,6 +107,24 @@ DB::DataTypePtr getDeletionVectorType()
return std::make_shared<DB::DataTypeTuple>(dv_descriptor_types,
dv_descriptor_names);
}
+String assembleDeletionVectorPath(const String & table_path, const String &
prefix, const String & uuid)
+{
+ String path = table_path + "/";
+ if (!prefix.empty())
+ path += prefix + "/";
+
+ path += DELETION_VECTOR_FILE_NAME_CORE + "_" + uuid + ".bin";
+ return path;
+}
+
+DeltaWriter::DeltaWriter(
+ const DB::ContextPtr & context_, const String & table_path_, const size_t
& prefix_length_, const size_t & packing_target_size_)
+ : context(context_), table_path(table_path_),
prefix_length(prefix_length_), packing_target_size(packing_target_size_)
+{
+ file_path_column = DB::ColumnString::create();
+ dv_descriptor_column = createDeletionVectorDescriptorColumn();
+ matched_row_count_col = DB::ColumnInt64::create();
+}
void DeltaWriter::writeDeletionVector(const DB::Block & block)
{
@@ -149,8 +194,8 @@ void DeltaWriter::writeDeletionVector(const DB::Block &
block)
Int32 checksum_value = static_cast<Int32>(crc32_z(0L,
reinterpret_cast<const unsigned char *>(bitmap.c_str()), bitmap_size));
DB::writeBinaryBigEndian(checksum_value, *write_buffer);
- auto dv_descriptor_field
- = createDeletionVectorDescriptorField(DeltaUtil::encodeUUID(uuid,
prefix), offset, bitmap_size, cardinality);
+ auto encoded = Base85Codec::encodeUUID(uuid);
+ auto dv_descriptor_field = createDeletionVectorDescriptorField(prefix
+ encoded, offset, bitmap_size, cardinality);
file_path_column->insert(file_path.data);
dv_descriptor_column->insert(dv_descriptor_field);
@@ -173,34 +218,9 @@ DB::Block * DeltaWriter::finalize()
return res;
}
-
-DB::ColumnTuple::MutablePtr DeltaWriter::createDeletionVectorDescriptorColumn()
-{
- DB::MutableColumns dv_descriptor_mutable_columns;
- dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); //
storageType
- dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); //
pathOrInlineDv
-
dv_descriptor_mutable_columns.emplace_back(DB::ColumnNullable::create(DB::ColumnInt32::create(),
DB::ColumnUInt8::create())); // offset
- dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt32::create()); //
sizeInBytes
- dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt64::create()); //
cardinality
- dv_descriptor_mutable_columns.emplace_back(
- DB::ColumnNullable::create(DB::ColumnInt64::create(),
DB::ColumnUInt8::create())); // maxRowIndex
-
- return DB::ColumnTuple::create(std::move(dv_descriptor_mutable_columns));
-}
-
-String DeltaWriter::assembleDeletionVectorPath(const String & table_path,
const String & prefix, const String & uuid) const
+std::unique_ptr<DB::WriteBuffer> DeltaWriter::createWriteBuffer(const String &
table_path, const String & prefix) const
{
- String path = table_path + "/";
- if (!prefix.empty())
- path += prefix + "/";
-
- path += DELETION_VECTOR_FILE_NAME_CORE + "_" + uuid + ".bin";
- return path;
-}
-
-std::unique_ptr<DB::WriteBuffer> DeltaWriter::createWriteBuffer(const String &
table_path, const String & prefix, const String & uuid) const
-{
- String dv_file = assembleDeletionVectorPath(table_path, prefix, uuid);
+ const String dv_file = assembleDeletionVectorPath(table_path, prefix,
toString(uuid));
std::string encoded;
Poco::URI::encode(dv_file, "", encoded);
@@ -215,36 +235,24 @@ DeltaDVRoaringBitmapArray
DeltaWriter::deserializeExistingBitmap(
const Int32 & existing_size_in_bytes,
const String & table_path) const
{
- const auto random_prefix_length = existing_path_or_inline_dv.length() -
Codec::Base85Codec::ENCODED_UUID_LENGTH;
+ static constexpr size_t ENCODED_UUID_LENGTH = 20;
+ const auto random_prefix_length = existing_path_or_inline_dv.length() -
ENCODED_UUID_LENGTH;
const auto randomPrefix = existing_path_or_inline_dv.substr(0,
random_prefix_length);
const auto encoded_uuid =
existing_path_or_inline_dv.substr(random_prefix_length);
- const auto existing_decode_uuid = DeltaUtil::decodeUUID(encoded_uuid);
- const String existing_dv_file = assembleDeletionVectorPath(table_path,
randomPrefix, existing_decode_uuid);
+ const auto existing_decode_uuid = Base85Codec::decodeUUID(encoded_uuid);
+ const String existing_dv_file = assembleDeletionVectorPath(table_path,
randomPrefix, toString(existing_decode_uuid));
DeltaDVRoaringBitmapArray existing_bitmap;
existing_bitmap.rb_read(existing_dv_file, existing_offset,
existing_size_in_bytes, context);
return existing_bitmap;
}
-DB::Tuple DeltaWriter::createDeletionVectorDescriptorField(
- const String & path_or_inline_dv, const Int32 & offset, const Int32 &
size_in_bytes, const Int64 & cardinality)
-{
- DB::Tuple tuple;
- tuple.emplace_back(UUID_DV_MARKER); // storageType
- tuple.emplace_back(path_or_inline_dv); // pathOrInlineDv
- tuple.emplace_back(offset); // offset
- tuple.emplace_back(size_in_bytes); // sizeInBytes
- tuple.emplace_back(cardinality); // cardinality
- tuple.emplace_back(DB::Field{}); // maxRowIndex
- return tuple;
-}
-
void DeltaWriter::initBinPackage()
{
offset = 0;
size_of_current_bin = 0;
prefix = getRandomPrefix(prefix_length);
- uuid = DB::toString(DB::UUIDHelpers::generateV4());
- write_buffer = createWriteBuffer(table_path, prefix, uuid);
+ uuid = DB::UUIDHelpers::generateV4();
+ write_buffer = createWriteBuffer(table_path, prefix);
DB::writeIntBinary(DV_FILE_FORMAT_VERSION_ID_V1, *write_buffer);
offset++;
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
index 5a08a33169..ce41ca3476 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
@@ -26,41 +26,26 @@
namespace local_engine::delta
{
-
-struct DeletionVectorDescriptor
-{
-};
+static constexpr String UUID_DV_MARKER = "u";
+static constexpr String DELETION_VECTOR_FILE_NAME_CORE = "deletion_vector";
class DeltaWriter
{
- static constexpr String UUID_DV_MARKER = "u";
- static constexpr String DELETION_VECTOR_FILE_NAME_CORE = "deletion_vector";
-
public:
explicit DeltaWriter(
- const DB::ContextPtr & context_, const String & table_path_, const
size_t & prefix_length_, const size_t & packing_target_size_)
- : context(context_), table_path(table_path_),
prefix_length(prefix_length_), packing_target_size(packing_target_size_)
- {
- file_path_column = DB::ColumnString::create();
- dv_descriptor_column = createDeletionVectorDescriptorColumn();
- matched_row_count_col = DB::ColumnInt64::create();
- }
+ const DB::ContextPtr & context_, const String & table_path_, const
size_t & prefix_length_, const size_t & packing_target_size_);
void
writeDeletionVector(const DB::Block & block);
DB::Block * finalize();
private:
- DB::ColumnTuple::MutablePtr createDeletionVectorDescriptorColumn();
- String assembleDeletionVectorPath(const String & table_path, const String
& prefix, const String & uuid) const;
- std::unique_ptr<DB::WriteBuffer> createWriteBuffer(const String &
table_path, const String & prefix, const String & uuid) const;
+ std::unique_ptr<DB::WriteBuffer> createWriteBuffer(const String &
table_path, const String & prefix) const;
DeltaDVRoaringBitmapArray deserializeExistingBitmap(
const String & existing_path_or_inline_dv,
const Int32 & existing_offset,
const Int32 & existing_size_in_bytes,
const String & table_path) const;
- DB::Tuple createDeletionVectorDescriptorField(
- const String & path_or_inline_dv, const Int32 & offset, const Int32 &
size_in_bytes, const Int64 & cardinality);
void initBinPackage();
@@ -77,7 +62,7 @@ private:
size_t offset = 0;
size_t size_of_current_bin = 0;
String prefix;
- String uuid;
+ DB::UUID uuid;
};
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index b9497aa920..5dac360220 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -47,8 +47,6 @@
#include <Storages/Output/BlockStripeSplitter.h>
#include <Storages/Output/NormalFileWriter.h>
#include <Storages/SubstraitSource/Delta/DeltaWriter.h>
-#include <Storages/SubstraitSource/Delta/DeltaUtil.h>
-#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <jni/SharedPointerWrapper.h>
#include <jni/jni_common.h>
#include <jni/jni_error.h>
@@ -200,7 +198,6 @@ JNIEXPORT void
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n
local_engine::BroadCastJoinBuilder::destroy(env);
local_engine::SparkMergeTreeWriterJNI::destroy(env);
local_engine::SparkRowInfoJNI::destroy(env);
- local_engine::delta::DeltaUtil::releaseJavaCallerReference(env);
env->DeleteGlobalRef(block_stripes_class);
env->DeleteGlobalRef(split_result_class);
@@ -1425,13 +1422,6 @@
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_deletionVectorWriteFinal
LOCAL_ENGINE_JNI_METHOD_END(env, -1);
}
-JNIEXPORT void
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_registerNativeReference(JNIEnv
* env, jclass)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- local_engine::delta::DeltaUtil::initJavaCallerReference(env);
- LOCAL_ENGINE_JNI_METHOD_END(env, );
-}
-
#ifdef __cplusplus
}
diff --git a/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
b/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
index d5194c9f01..8131c4c1c4 100644
--- a/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
+++ b/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
@@ -16,6 +16,8 @@
*/
#include <zlib.h>
#include <Core/Settings.h>
+#include <IO/ReadBufferFromString.h>
+#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Parser/SerializedPlanParser.h>
#include <Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h>
@@ -23,6 +25,7 @@
#include <gtest/gtest.h>
#include <tests/utils/gluten_test_util.h>
#include <roaring.hh>
+#include <Common/Base85Codec.h>
#include <Common/QueryContext.h>
namespace DB::Setting
@@ -222,4 +225,17 @@ TEST(Delta_DV, DeltaDVRoaringBitmapArray)
EXPECT_FALSE(bitmap_array3.rb_contains(10000000001));
EXPECT_FALSE(bitmap_array3.rb_contains(5000000001));
EXPECT_FALSE(bitmap_array3.rb_contains(3000000001));
+}
+
+TEST(Delta_DV, Base85Codec)
+{
+ const String uuid_str = "a5d455b6-92f1-4c89-a26d-93a1d5ce3e89";
+ DB::ReadBufferFromString rb = DB::ReadBufferFromString(uuid_str);
+ UUID uuid;
+ readUUIDText(uuid, rb);
+
+ const String encoded = Base85Codec::encodeUUID(uuid);
+ EXPECT_EQ("RpnINLjqk5Qhu9/!Y{vn", encoded);
+ auto decodeUUID = Base85Codec::decodeUUID(encoded);
+ EXPECT_EQ(uuid_str, toString(decodeUUID));
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]