This is an automated email from the ASF dual-hosted git repository.

loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c2cfb8705a [CH] Support native base85codec (#9421)
c2cfb8705a is described below

commit c2cfb8705a418a09db3f781edbe0f5857894588e
Author: Shuai li <[email protected]>
AuthorDate: Fri Apr 25 17:02:44 2025 +0800

    [CH] Support native base85codec (#9421)
---
 .../gluten/vectorized/DeltaWriterJNIWrapper.java   |  13 --
 .../gluten/sql/shims/delta32/Delta32Shims.scala    |   7 --
 .../execution/DeletionVectorWriteTransformer.scala |  16 +--
 cpp-ch/local-engine/Common/Base85Codec.cpp         | 140 +++++++++++++++++++++
 cpp-ch/local-engine/Common/Base85Codec.h           |  55 ++++++++
 .../Storages/SubstraitSource/Delta/DeltaUtil.cpp   |  80 ------------
 .../Storages/SubstraitSource/Delta/DeltaUtil.h     |  51 --------
 .../Storages/SubstraitSource/Delta/DeltaWriter.cpp | 104 ++++++++-------
 .../Storages/SubstraitSource/Delta/DeltaWriter.h   |  25 +---
 cpp-ch/local-engine/local_engine_jni.cpp           |  10 --
 .../tests/gtest_clickhouse_roaring_bitmap.cpp      |  16 +++
 11 files changed, 273 insertions(+), 244 deletions(-)

diff --git 
a/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
 
b/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
index ada976668c..1dfa67764d 100644
--- 
a/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
+++ 
b/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.vectorized;
 
-import org.apache.spark.sql.execution.DeletionVectorWriteTransformer;
 
 public class DeltaWriterJNIWrapper {
 
@@ -24,21 +23,9 @@ public class DeltaWriterJNIWrapper {
         // utility class
     }
 
-    public static native void registerNativeReference();
-
     public static native long createDeletionVectorWriter(String tablePath, int 
prefix_length, long packingTargetSize);
 
     public static native void deletionVectorWrite(long writer_address, long 
block_address);
 
     public static native long deletionVectorWriteFinalize(long writer_address);
-
-    // call from native
-    public static String encodeUUID(String uuid, String randomPrefix) {
-        return DeletionVectorWriteTransformer.encodeUUID(uuid, randomPrefix);
-    }
-
-    // call from native
-    public static String decodeUUID(String encodedUuid) {
-        return DeletionVectorWriteTransformer.decodeUUID(encodedUuid);
-    }
 }
diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
index 0244bf3ffd..52da7b7883 100644
--- 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
@@ -19,10 +19,7 @@ package org.apache.gluten.sql.shims.delta32
 import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.extension.{DeltaExpressionExtensionTransformer, 
ExpressionExtensionTrait}
 import org.apache.gluten.sql.shims.DeltaShims
-import org.apache.gluten.vectorized.DeltaWriterJNIWrapper
 
-import org.apache.spark.SparkContext
-import org.apache.spark.api.plugin.PluginContext
 import org.apache.spark.sql.delta.DeltaParquetFileFormat
 import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
 import org.apache.spark.sql.delta.util.JsonUtils
@@ -44,10 +41,6 @@ class Delta32Shims extends DeltaShims {
     DeltaOptimizedWriterTransformer.from(plan)
   }
 
-  override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
-    DeltaWriterJNIWrapper.registerNativeReference()
-  }
-
   override def registerExpressionExtension(): Unit = {
     
ExpressionExtensionTrait.registerExpressionExtension(DeltaExpressionExtensionTransformer())
   }
diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
index 2715eff63e..56a756b012 100644
--- 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
 import org.apache.spark.sql.delta.commands.DeletionVectorResult
 import org.apache.spark.sql.delta.sources.DeltaSQLConf
 import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
-import org.apache.spark.sql.delta.util.{Codec, Utils => DeltaUtils}
+import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
 import org.apache.spark.sql.execution.datasources.CallTransformer
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -38,7 +38,6 @@ import org.apache.spark.util.Utils
 
 import org.apache.hadoop.fs.Path
 
-import java.util.UUID
 import java.util.concurrent.atomic.AtomicLong
 
 case class DeletionVectorWriteTransformer(
@@ -113,19 +112,6 @@ object DeletionVectorWriteTransformer {
       StructField.apply("maxRowIndex", LongType, nullable = true)
     ))
 
-  def encodeUUID(uuid: String, randomPrefix: String): String = {
-    val uuidData = Codec.Base85Codec.encodeUUID(UUID.fromString(uuid))
-    // This should always be true and we are relying on it for separating out 
the
-    // prefix again later without having to spend an extra character as a 
separator.
-    assert(uuidData.length == 20)
-    // uuidData
-    s"$randomPrefix$uuidData"
-  }
-
-  def decodeUUID(encodedUuid: String): String = {
-    Codec.Base85Codec.decodeUUID(encodedUuid).toString
-  }
-
   def replace(
       aggregated: DataFrame,
       tablePath: Path,
diff --git a/cpp-ch/local-engine/Common/Base85Codec.cpp 
b/cpp-ch/local-engine/Common/Base85Codec.cpp
new file mode 100644
index 0000000000..688c2413f1
--- /dev/null
+++ b/cpp-ch/local-engine/Common/Base85Codec.cpp
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Base85Codec.h"
+
+#include <IO/ReadBufferFromString.h>
+#include <IO/ReadHelpers.h>
+#include <IO/WriteHelpers.h>
+
+
+namespace local_engine
+{
+String uuidToByteBuffer(const DB::UUID & uuid)
+{
+    const UInt128 under_type = uuid.toUnderType();
+    long low = under_type.items[0];
+    long high = under_type.items[1];
+
+    String result(16, '\0');
+
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+    const Int64 low_swap = __builtin_bswap64(low);
+    const Int64 high_swap = __builtin_bswap64(high);
+    memcpy(result.data(), &low_swap, 8);
+    memcpy(result.data() + 8, &high_swap, 8);
+#else
+    memcpy(result.data(), &high, 8);
+    memcpy(result.data() + 8, &low, 8);
+#endif
+
+    return result;
+}
+
+DB::UUID uuidFromByteBuffer(const String & buffer)
+{
+    chassert(buffer.size() >= 16);
+    DB::ReadBufferFromString buf(buffer);
+    Int64 lowBits;
+    Int64 highBits;
+    DB::readBinaryBigEndian(lowBits, buf);
+    DB::readBinaryBigEndian(highBits, buf);
+
+    DB::UUID uuid;
+    uuid.toUnderType().items[0] = lowBits;
+    uuid.toUnderType().items[1] = highBits;
+    return uuid;
+}
+
+
+String Base85Codec::encodeUUID(const DB::UUID & uuid)
+{
+    const String blocks = uuidToByteBuffer(uuid);
+    return encodeBlocks(blocks);
+}
+
+DB::UUID Base85Codec::decodeUUID(const String & encoded)
+{
+    const String blocks = decodeBlocks(encoded);
+    return uuidFromByteBuffer(blocks);
+}
+
+String Base85Codec::encodeBlocks(const String & blocks)
+{
+    chassert(blocks.size() % 4 == 0);
+    auto numBlocks = blocks.size() / 4;
+    // Every 4 byte block gets encoded into 5 bytes/chars
+    const auto outputLength = numBlocks * 5;
+    String output(outputLength, '\0');
+    size_t outputIndex = 0;
+
+    DB::ReadBufferFromString rb(blocks);
+    while (!rb.eof())
+    {
+        Int32 readInt;
+        DB::readBinaryBigEndian(readInt, rb);
+        Int64 sum = readInt & 0x00000000ffffffffL;
+        output[outputIndex] = ENCODE_MAP[static_cast<Int32>(sum / 
BASE_4TH_POWER)];
+        sum %= BASE_4TH_POWER;
+
+        output[outputIndex + 1] = ENCODE_MAP[static_cast<Int32>(sum / 
BASE_3RD_POWER)];
+        sum %= BASE_3RD_POWER;
+        output[outputIndex + 2] = ENCODE_MAP[static_cast<Int32>(sum / 
BASE_2ND_POWER)];
+        sum %= BASE_2ND_POWER;
+        output[outputIndex + 3] = ENCODE_MAP[static_cast<Int32>(sum / BASE)];
+        output[outputIndex + 4] = ENCODE_MAP[static_cast<Int32>(sum % BASE)];
+        outputIndex += 5;
+    }
+
+    return output;
+}
+
+String Base85Codec::decodeBlocks(const String & encoded)
+{
+    chassert(encoded.size() % 5 == 0);
+    String result(encoded.size() / 5 * 4, '\0');
+
+    // A mechanism to detect invalid characters in the input while decoding, 
that only has a
+    // single conditional at the very end, instead of branching for every 
character.
+    Int32 canary = 0;
+    auto decodeInputChar = [&encoded, &canary](Int32 i) -> Int64
+    {
+        const auto c = encoded[i];
+
+        canary |= c; // non-ascii char has bits outside of ASCII_BITMASK
+        const auto b = DECODE_MAP[c & ASCII_BITMASK];
+        canary |= b; // invalid char maps to -1, which has bits outside 
ASCII_BITMASK
+        return static_cast<Int64>(b);
+    };
+
+    Int32 inputIndex = 0;
+    DB::WriteBufferFromString buf(result);
+    while (buf.hasPendingData())
+    {
+        Int64 sum = 0L;
+        sum += decodeInputChar(inputIndex) * BASE_4TH_POWER;
+        sum += decodeInputChar(inputIndex + 1) * BASE_3RD_POWER;
+        sum += decodeInputChar(inputIndex + 2) * BASE_2ND_POWER;
+        sum += decodeInputChar(inputIndex + 3) * BASE;
+        sum += decodeInputChar(inputIndex + 4);
+        DB::writeBinaryBigEndian(static_cast<Int32>(sum), buf);
+        inputIndex += 5;
+    }
+    buf.finalize();
+    return result;
+}
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/Base85Codec.h 
b/cpp-ch/local-engine/Common/Base85Codec.h
new file mode 100644
index 0000000000..c2e47c41d5
--- /dev/null
+++ b/cpp-ch/local-engine/Common/Base85Codec.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <Core/Types.h>
+#include <Core/Types_fwd.h>
+
+namespace local_engine
+{
+
+static constexpr char * ENCODE_MAP = 
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#";
+static constexpr Int64 BASE = 85L;
+static constexpr Int64 BASE_2ND_POWER = 7225L; // 85^2
+static constexpr Int64 BASE_3RD_POWER = 614125L; // 85^3
+static constexpr Int64 BASE_4TH_POWER = 52200625L; // 85^4
+static constexpr Int32 ASCII_BITMASK = 0x7F;
+
+static String generateDecodeMap()
+{
+    // The bitmask is the same as largest possible value, so the length of the 
array must
+    // be one greater.
+    String result(ASCII_BITMASK + 1, 0xFF);
+    for (UInt8 i = 0; ENCODE_MAP[i] != '\0'; ++i)
+        result[ENCODE_MAP[i]] = i;
+
+    return result;
+}
+
+static const String DECODE_MAP = generateDecodeMap();
+
+class Base85Codec
+{
+public:
+    static String encodeUUID(const DB::UUID & uuid);
+    static DB::UUID decodeUUID(const String & encoded);
+
+private:
+    static String encodeBlocks(const String & blocks);
+    static String decodeBlocks(const String & encoded);
+};
+};
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp
deleted file mode 100644
index 725e3446a2..0000000000
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "DeltaUtil.h"
-
-#include <Compression/CompressedReadBuffer.h>
-#include <Core/Block.h>
-#include <IO/ReadBuffer.h>
-#include <jni/jni_common.h>
-#include <Common/JNIUtils.h>
-
-namespace local_engine::delta
-{
-
-jclass DeltaUtil::delta_jni_class = nullptr;
-jmethodID DeltaUtil::delta_jni_encode_uuid = nullptr;
-jmethodID DeltaUtil::delta_jni_decode_uuid = nullptr;
-
-void DeltaUtil::initJavaCallerReference(JNIEnv * env)
-{
-    delta_jni_class = CreateGlobalClassReference(env, 
"Lorg/apache/gluten/vectorized/DeltaWriterJNIWrapper;");
-    delta_jni_encode_uuid
-        = GetStaticMethodID(env, delta_jni_class, "encodeUUID", 
"(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;");
-    delta_jni_decode_uuid = GetStaticMethodID(env, delta_jni_class, 
"decodeUUID", "(Ljava/lang/String;)Ljava/lang/String;");
-}
-
-void DeltaUtil::releaseJavaCallerReference(JNIEnv * env)
-{
-    if (delta_jni_class)
-        env->DeleteGlobalRef(delta_jni_class);
-}
-
-String DeltaUtil::encodeUUID(String uuid, String prefix)
-{
-    GET_JNIENV(env)
-    jstring jUuid = env->NewStringUTF(uuid.c_str());
-    jstring jRandomPrefix = env->NewStringUTF(prefix.c_str());
-
-    // Call the static Java method
-    jstring jResult = (jstring)env->CallStaticObjectMethod(delta_jni_class, 
delta_jni_encode_uuid, jUuid, jRandomPrefix);
-    const char * resultCStr = env->GetStringUTFChars(jResult, nullptr);
-    std::string encode(resultCStr);
-    env->ReleaseStringUTFChars(jResult, resultCStr);
-    env->DeleteLocalRef(jUuid);
-    env->DeleteLocalRef(jRandomPrefix);
-    env->DeleteLocalRef(jResult);
-    CLEAN_JNIENV
-    return encode;
-}
-
-String DeltaUtil::decodeUUID(String encodedUuid)
-{
-    GET_JNIENV(env)
-    jstring j_encoded_uuid = env->NewStringUTF(encodedUuid.c_str());
-    jstring jResult = (jstring)env->CallStaticObjectMethod(delta_jni_class, 
delta_jni_decode_uuid, j_encoded_uuid);
-    const char * resultCStr = env->GetStringUTFChars(jResult, nullptr);
-    std::string decode_uuid(resultCStr);
-
-    env->ReleaseStringUTFChars(jResult, resultCStr);
-    env->DeleteLocalRef(j_encoded_uuid);
-    env->DeleteLocalRef(jResult);
-    CLEAN_JNIENV
-    return decode_uuid;
-}
-
-}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h
deleted file mode 100644
index d4f80b7d05..0000000000
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <jni.h>
-#include <jni/jni_common.h>
-
-
-namespace local_engine::delta
-{
-struct Codec
-{
-    struct Base85Codec
-    {
-        static constexpr Int32 ENCODED_UUID_LENGTH = 20;
-    };
-};
-
-
-static constexpr String UUID_DV_MARKER = "u";
-
-class DeltaUtil
-{
-public:
-    static void initJavaCallerReference(JNIEnv * env);
-    static void releaseJavaCallerReference(JNIEnv * env);
-
-    static String encodeUUID(String uuid, String prefix);
-    static String decodeUUID(String encodedUuid);
-
-private:
-    static jclass delta_jni_class;
-    static jmethodID delta_jni_encode_uuid;
-    static jmethodID delta_jni_decode_uuid;
-};
-
-}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
index 2997e990b5..a074b5876b 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
@@ -31,9 +31,9 @@
 #include <IO/WriteBuffer.h>
 #include <IO/WriteHelpers.h>
 #include <Storages/Output/WriteBufferBuilder.h>
-#include <Storages/SubstraitSource/Delta/DeltaUtil.h>
 #include <rapidjson/document.h>
 #include <Poco/URI.h>
+#include <Common/Base85Codec.h>
 
 namespace local_engine::delta
 {
@@ -50,6 +50,33 @@ String getRandomPrefix(const size_t & length)
     return res;
 }
 
+DB::ColumnTuple::MutablePtr createDeletionVectorDescriptorColumn()
+{
+    DB::MutableColumns dv_descriptor_mutable_columns;
+    dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); // 
storageType
+    dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); // 
pathOrInlineDv
+    
dv_descriptor_mutable_columns.emplace_back(DB::ColumnNullable::create(DB::ColumnInt32::create(),
 DB::ColumnUInt8::create())); // offset
+    dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt32::create()); // 
sizeInBytes
+    dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt64::create()); // 
cardinality
+    dv_descriptor_mutable_columns.emplace_back(
+        DB::ColumnNullable::create(DB::ColumnInt64::create(), 
DB::ColumnUInt8::create())); // maxRowIndex
+
+    return DB::ColumnTuple::create(std::move(dv_descriptor_mutable_columns));
+}
+
+DB::Tuple createDeletionVectorDescriptorField(
+    const String & path_or_inline_dv, const Int32 & offset, const Int32 & 
size_in_bytes, const Int64 & cardinality)
+{
+    DB::Tuple tuple;
+    tuple.emplace_back(UUID_DV_MARKER); // storageType
+    tuple.emplace_back(path_or_inline_dv); // pathOrInlineDv
+    tuple.emplace_back(offset); // offset
+    tuple.emplace_back(size_in_bytes); // sizeInBytes
+    tuple.emplace_back(cardinality); // cardinality
+    tuple.emplace_back(DB::Field{}); // maxRowIndex
+    return tuple;
+}
+
 DB::DataTypePtr getDeletionVectorType()
 {
     DB::DataTypes dv_descriptor_types;
@@ -80,6 +107,24 @@ DB::DataTypePtr getDeletionVectorType()
     return std::make_shared<DB::DataTypeTuple>(dv_descriptor_types, 
dv_descriptor_names);
 }
 
+String assembleDeletionVectorPath(const String & table_path, const String & 
prefix, const String & uuid)
+{
+    String path = table_path + "/";
+    if (!prefix.empty())
+        path += prefix + "/";
+
+    path += DELETION_VECTOR_FILE_NAME_CORE + "_" + uuid + ".bin";
+    return path;
+}
+
+DeltaWriter::DeltaWriter(
+    const DB::ContextPtr & context_, const String & table_path_, const size_t 
& prefix_length_, const size_t & packing_target_size_)
+    : context(context_), table_path(table_path_), 
prefix_length(prefix_length_), packing_target_size(packing_target_size_)
+{
+    file_path_column = DB::ColumnString::create();
+    dv_descriptor_column = createDeletionVectorDescriptorColumn();
+    matched_row_count_col = DB::ColumnInt64::create();
+}
 
 void DeltaWriter::writeDeletionVector(const DB::Block & block)
 {
@@ -149,8 +194,8 @@ void DeltaWriter::writeDeletionVector(const DB::Block & 
block)
         Int32 checksum_value = static_cast<Int32>(crc32_z(0L, 
reinterpret_cast<const unsigned char *>(bitmap.c_str()), bitmap_size));
         DB::writeBinaryBigEndian(checksum_value, *write_buffer);
 
-        auto dv_descriptor_field
-            = createDeletionVectorDescriptorField(DeltaUtil::encodeUUID(uuid, 
prefix), offset, bitmap_size, cardinality);
+        auto encoded = Base85Codec::encodeUUID(uuid);
+        auto dv_descriptor_field = createDeletionVectorDescriptorField(prefix 
+ encoded, offset, bitmap_size, cardinality);
 
         file_path_column->insert(file_path.data);
         dv_descriptor_column->insert(dv_descriptor_field);
@@ -173,34 +218,9 @@ DB::Block * DeltaWriter::finalize()
     return res;
 }
 
-
-DB::ColumnTuple::MutablePtr DeltaWriter::createDeletionVectorDescriptorColumn()
-{
-    DB::MutableColumns dv_descriptor_mutable_columns;
-    dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); // 
storageType
-    dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); // 
pathOrInlineDv
-    
dv_descriptor_mutable_columns.emplace_back(DB::ColumnNullable::create(DB::ColumnInt32::create(),
 DB::ColumnUInt8::create())); // offset
-    dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt32::create()); // 
sizeInBytes
-    dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt64::create()); // 
cardinality
-    dv_descriptor_mutable_columns.emplace_back(
-        DB::ColumnNullable::create(DB::ColumnInt64::create(), 
DB::ColumnUInt8::create())); // maxRowIndex
-
-    return DB::ColumnTuple::create(std::move(dv_descriptor_mutable_columns));
-}
-
-String DeltaWriter::assembleDeletionVectorPath(const String & table_path, 
const String & prefix, const String & uuid) const
+std::unique_ptr<DB::WriteBuffer> DeltaWriter::createWriteBuffer(const String & 
table_path, const String & prefix) const
 {
-    String path = table_path + "/";
-    if (!prefix.empty())
-        path += prefix + "/";
-
-    path += DELETION_VECTOR_FILE_NAME_CORE + "_" + uuid + ".bin";
-    return path;
-}
-
-std::unique_ptr<DB::WriteBuffer> DeltaWriter::createWriteBuffer(const String & 
table_path, const String & prefix, const String & uuid) const
-{
-    String dv_file = assembleDeletionVectorPath(table_path, prefix, uuid);
+    const String dv_file = assembleDeletionVectorPath(table_path, prefix, 
toString(uuid));
 
     std::string encoded;
     Poco::URI::encode(dv_file, "", encoded);
@@ -215,36 +235,24 @@ DeltaDVRoaringBitmapArray 
DeltaWriter::deserializeExistingBitmap(
     const Int32 & existing_size_in_bytes,
     const String & table_path) const
 {
-    const auto random_prefix_length = existing_path_or_inline_dv.length() - 
Codec::Base85Codec::ENCODED_UUID_LENGTH;
+    static constexpr size_t ENCODED_UUID_LENGTH = 20;
+    const auto random_prefix_length = existing_path_or_inline_dv.length() - 
ENCODED_UUID_LENGTH;
     const auto randomPrefix = existing_path_or_inline_dv.substr(0, 
random_prefix_length);
     const auto encoded_uuid = 
existing_path_or_inline_dv.substr(random_prefix_length);
-    const auto existing_decode_uuid = DeltaUtil::decodeUUID(encoded_uuid);
-    const String existing_dv_file = assembleDeletionVectorPath(table_path, 
randomPrefix, existing_decode_uuid);
+    const auto existing_decode_uuid = Base85Codec::decodeUUID(encoded_uuid);
+    const String existing_dv_file = assembleDeletionVectorPath(table_path, 
randomPrefix, toString(existing_decode_uuid));
     DeltaDVRoaringBitmapArray existing_bitmap;
     existing_bitmap.rb_read(existing_dv_file, existing_offset, 
existing_size_in_bytes, context);
     return existing_bitmap;
 }
 
-DB::Tuple DeltaWriter::createDeletionVectorDescriptorField(
-    const String & path_or_inline_dv, const Int32 & offset, const Int32 & 
size_in_bytes, const Int64 & cardinality)
-{
-    DB::Tuple tuple;
-    tuple.emplace_back(UUID_DV_MARKER); // storageType
-    tuple.emplace_back(path_or_inline_dv); // pathOrInlineDv
-    tuple.emplace_back(offset); // offset
-    tuple.emplace_back(size_in_bytes); // sizeInBytes
-    tuple.emplace_back(cardinality); // cardinality
-    tuple.emplace_back(DB::Field{}); // maxRowIndex
-    return tuple;
-}
-
 void DeltaWriter::initBinPackage()
 {
     offset = 0;
     size_of_current_bin = 0;
     prefix = getRandomPrefix(prefix_length);
-    uuid = DB::toString(DB::UUIDHelpers::generateV4());
-    write_buffer = createWriteBuffer(table_path, prefix, uuid);
+    uuid = DB::UUIDHelpers::generateV4();
+    write_buffer = createWriteBuffer(table_path, prefix);
     DB::writeIntBinary(DV_FILE_FORMAT_VERSION_ID_V1, *write_buffer);
     offset++;
 }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
index 5a08a33169..ce41ca3476 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
@@ -26,41 +26,26 @@
 
 namespace local_engine::delta
 {
-
-struct DeletionVectorDescriptor
-{
-};
+static constexpr String UUID_DV_MARKER = "u";
+static constexpr String DELETION_VECTOR_FILE_NAME_CORE = "deletion_vector";
 
 class DeltaWriter
 {
-    static constexpr String UUID_DV_MARKER = "u";
-    static constexpr String DELETION_VECTOR_FILE_NAME_CORE = "deletion_vector";
-
 public:
     explicit DeltaWriter(
-        const DB::ContextPtr & context_, const String & table_path_, const 
size_t & prefix_length_, const size_t & packing_target_size_)
-        : context(context_), table_path(table_path_), 
prefix_length(prefix_length_), packing_target_size(packing_target_size_)
-    {
-        file_path_column = DB::ColumnString::create();
-        dv_descriptor_column = createDeletionVectorDescriptorColumn();
-        matched_row_count_col = DB::ColumnInt64::create();
-    }
+        const DB::ContextPtr & context_, const String & table_path_, const 
size_t & prefix_length_, const size_t & packing_target_size_);
     void
     writeDeletionVector(const DB::Block & block);
 
     DB::Block * finalize();
 
 private:
-    DB::ColumnTuple::MutablePtr createDeletionVectorDescriptorColumn();
-    String assembleDeletionVectorPath(const String & table_path, const String 
& prefix, const String & uuid) const;
-    std::unique_ptr<DB::WriteBuffer> createWriteBuffer(const String & 
table_path, const String & prefix, const String & uuid) const;
+    std::unique_ptr<DB::WriteBuffer> createWriteBuffer(const String & 
table_path, const String & prefix) const;
     DeltaDVRoaringBitmapArray deserializeExistingBitmap(
         const String & existing_path_or_inline_dv,
         const Int32 & existing_offset,
         const Int32 & existing_size_in_bytes,
         const String & table_path) const;
-    DB::Tuple createDeletionVectorDescriptorField(
-        const String & path_or_inline_dv, const Int32 & offset, const Int32 & 
size_in_bytes, const Int64 & cardinality);
 
     void initBinPackage();
 
@@ -77,7 +62,7 @@ private:
     size_t offset = 0;
     size_t size_of_current_bin = 0;
     String prefix;
-    String uuid;
+    DB::UUID uuid;
 };
 
 
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index b9497aa920..5dac360220 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -47,8 +47,6 @@
 #include <Storages/Output/BlockStripeSplitter.h>
 #include <Storages/Output/NormalFileWriter.h>
 #include <Storages/SubstraitSource/Delta/DeltaWriter.h>
-#include <Storages/SubstraitSource/Delta/DeltaUtil.h>
-#include <Storages/SubstraitSource/ReadBufferBuilder.h>
 #include <jni/SharedPointerWrapper.h>
 #include <jni/jni_common.h>
 #include <jni/jni_error.h>
@@ -200,7 +198,6 @@ JNIEXPORT void 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n
     local_engine::BroadCastJoinBuilder::destroy(env);
     local_engine::SparkMergeTreeWriterJNI::destroy(env);
     local_engine::SparkRowInfoJNI::destroy(env);
-    local_engine::delta::DeltaUtil::releaseJavaCallerReference(env);
 
     env->DeleteGlobalRef(block_stripes_class);
     env->DeleteGlobalRef(split_result_class);
@@ -1425,13 +1422,6 @@ 
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_deletionVectorWriteFinal
     LOCAL_ENGINE_JNI_METHOD_END(env, -1);
 }
 
-JNIEXPORT void 
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_registerNativeReference(JNIEnv
 * env, jclass)
-{
-    LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::delta::DeltaUtil::initJavaCallerReference(env);
-    LOCAL_ENGINE_JNI_METHOD_END(env, );
-}
-
 #ifdef __cplusplus
 }
 
diff --git a/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp 
b/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
index d5194c9f01..8131c4c1c4 100644
--- a/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
+++ b/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
@@ -16,6 +16,8 @@
  */
 #include <zlib.h>
 #include <Core/Settings.h>
+#include <IO/ReadBufferFromString.h>
+#include <IO/ReadHelpers.h>
 #include <Interpreters/Context.h>
 #include <Parser/SerializedPlanParser.h>
 #include <Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h>
@@ -23,6 +25,7 @@
 #include <gtest/gtest.h>
 #include <tests/utils/gluten_test_util.h>
 #include <roaring.hh>
+#include <Common/Base85Codec.h>
 #include <Common/QueryContext.h>
 
 namespace DB::Setting
@@ -222,4 +225,17 @@ TEST(Delta_DV, DeltaDVRoaringBitmapArray)
     EXPECT_FALSE(bitmap_array3.rb_contains(10000000001));
     EXPECT_FALSE(bitmap_array3.rb_contains(5000000001));
     EXPECT_FALSE(bitmap_array3.rb_contains(3000000001));
+}
+
+TEST(Delta_DV, Base85Codec)
+{
+    const String uuid_str = "a5d455b6-92f1-4c89-a26d-93a1d5ce3e89";
+    DB::ReadBufferFromString rb = DB::ReadBufferFromString(uuid_str);
+    UUID uuid;
+    readUUIDText(uuid, rb);
+
+    const String encoded = Base85Codec::encodeUUID(uuid);
+    EXPECT_EQ("RpnINLjqk5Qhu9/!Y{vn", encoded);
+    auto decodeUUID = Base85Codec::decodeUUID(encoded);
+    EXPECT_EQ(uuid_str, toString(decodeUUID));
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to