This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 50d658d710 [iceberg] Improved ByteBuffer string conversion for Iceberg 
manifests (#5008)
50d658d710 is described below

commit 50d658d7108ae2e7b3ed460ac1129aa4a2294d26
Author: 0dunay0 <[email protected]>
AuthorDate: Mon Feb 10 05:28:10 2025 +0000

    [iceberg] Improved ByteBuffer string conversion for Iceberg manifests 
(#5008)
---
 .../iceberg/manifest/IcebergConversions.java       |  14 +-
 .../paimon/iceberg/IcebergCompatibilityTest.java   | 105 ++++++++++++
 .../manifest/IcebergConversionsVarcharTest.java    | 181 +++++++++++++++++++++
 3 files changed, 299 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
index 9048d46e44..b20c55ee67 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
@@ -73,7 +73,19 @@ public class IcebergConversions {
             case VARCHAR:
                 CharBuffer buffer = CharBuffer.wrap(value.toString());
                 try {
-                    return ENCODER.get().encode(buffer);
+                    ByteBuffer encoded = ENCODER.get().encode(buffer);
+                    // ByteBuffer and CharBuffer allocate space based on 
capacity
+                    // not actual content length. so we need to create a new 
ByteBuffer
+                    // with the exact length of the encoded content
+                    // to avoid padding the output with \u0000
+                    if (encoded.limit() != encoded.capacity()) {
+                        ByteBuffer exact = 
ByteBuffer.allocate(encoded.limit());
+                        encoded.position(0);
+                        exact.put(encoded);
+                        exact.flip();
+                        return exact;
+                    }
+                    return encoded;
                 } catch (CharacterCodingException e) {
                     throw new RuntimeException("Failed to encode value as 
UTF-8: " + value, e);
                 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 7258a1dd41..f29eb55113 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -53,6 +53,11 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.IcebergGenerics;
@@ -63,7 +68,10 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
@@ -595,6 +603,103 @@ public class IcebergCompatibilityTest {
                         "Record(2, {20=[Record(cherry, 200), Record(pear, 
201)]})");
     }
 
+    @Test
+    public void testStringPartitionNullPadding() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), 
DataTypes.VARCHAR(20)},
+                        new String[] {"k", "country"});
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        Collections.singletonList("country"),
+                        Collections.singletonList("k"),
+                        -1);
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, BinaryString.fromString("Switzerland")), 
1);
+        write.write(GenericRow.of(2, BinaryString.fromString("Australia")), 1);
+        write.write(GenericRow.of(3, BinaryString.fromString("Brazil")), 1);
+        write.write(GenericRow.of(4, BinaryString.fromString("Grand Duchy of 
Luxembourg")), 1);
+        commit.commit(1, write.prepareCommit(false, 1));
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder(
+                        "Record(1, Switzerland)",
+                        "Record(2, Australia)",
+                        "Record(3, Brazil)",
+                        "Record(4, Grand Duchy of Luxembourg)");
+
+        FileIO fileIO = table.fileIO();
+        IcebergMetadata metadata =
+                IcebergMetadata.fromPath(
+                        fileIO, new Path(table.location(), 
"metadata/v1.metadata.json"));
+
+        IcebergPathFactory pathFactory =
+                new IcebergPathFactory(new Path(table.location(), "metadata"));
+        IcebergManifestList manifestList = IcebergManifestList.create(table, 
pathFactory);
+        String currentSnapshotManifest = 
metadata.currentSnapshot().manifestList();
+
+        File snapShotAvroFile = new File(currentSnapshotManifest);
+        String expectedPartitionSummary =
+                "[{\"contains_null\": false, \"contains_nan\": false, 
\"lower_bound\": \"Australia\", \"upper_bound\": \"Switzerland\"}]";
+        try (DataFileReader<GenericRecord> dataFileReader =
+                new DataFileReader<>(
+                        new SeekableFileInput(snapShotAvroFile), new 
GenericDatumReader<>())) {
+            while (dataFileReader.hasNext()) {
+                GenericRecord record = dataFileReader.next();
+                String partitionSummary = record.get("partitions").toString();
+                assertThat(partitionSummary).doesNotContain("\\u0000");
+                
assertThat(partitionSummary).isEqualTo(expectedPartitionSummary);
+            }
+        }
+
+        String tableManifest = 
manifestList.read(snapShotAvroFile.getName()).get(0).manifestPath();
+
+        try (DataFileReader<GenericRecord> dataFileReader =
+                new DataFileReader<>(
+                        new SeekableFileInput(new File(tableManifest)),
+                        new GenericDatumReader<>())) {
+
+            while (dataFileReader.hasNext()) {
+                GenericRecord record = dataFileReader.next();
+                GenericRecord dataFile = (GenericRecord) 
record.get("data_file");
+
+                // Check lower bounds
+                GenericData.Array<?> lowerBounds =
+                        (GenericData.Array<?>) dataFile.get("lower_bounds");
+                if (lowerBounds != null) {
+                    for (Object bound : lowerBounds) {
+                        GenericRecord boundRecord = (GenericRecord) bound;
+                        int key = (Integer) boundRecord.get("key");
+                        if (key == 1) { // key = 1 is the partition key
+                            ByteBuffer value = (ByteBuffer) 
boundRecord.get("value");
+                            String boundValue = new String(value.array(), 
StandardCharsets.UTF_8);
+                            assertThat(boundValue).doesNotContain("\u0000");
+                        }
+                    }
+                }
+
+                // Check upper bounds
+                GenericData.Array<?> upperBounds =
+                        (GenericData.Array<?>) dataFile.get("upper_bounds");
+                if (upperBounds != null) {
+                    for (Object bound : upperBounds) {
+                        GenericRecord boundRecord = (GenericRecord) bound;
+                        int key = (Integer) boundRecord.get("key");
+                        if (key == 1) { // key = 1 is the partition key
+                            ByteBuffer value = (ByteBuffer) 
boundRecord.get("value");
+                            String boundValue = new String(value.array(), 
StandardCharsets.UTF_8);
+                            assertThat(boundValue).doesNotContain("\u0000");
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Random Tests
     // ------------------------------------------------------------------------
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/manifest/IcebergConversionsVarcharTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/manifest/IcebergConversionsVarcharTest.java
new file mode 100644
index 0000000000..c53e6bb0d0
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/manifest/IcebergConversionsVarcharTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class IcebergConversionsVarcharTest {
+
+    @Test
+    void testEmptyString() {
+        String empty = "";
+        ByteBuffer result = 
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(10), empty);
+        String decodedString = new String(result.array(), 
StandardCharsets.UTF_8);
+        assertThat(result.array()).isEmpty();
+        assertThat(empty).isEqualTo(decodedString);
+    }
+
+    @Test
+    void testNullHandling() {
+        assertThatThrownBy(() -> 
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(10), null))
+                .isInstanceOf(NullPointerException.class);
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideSpecialStrings")
+    @DisplayName("Test special string cases")
+    void testSpecialStrings(String input) {
+        ByteBuffer result = 
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(100), input);
+        String decoded = new String(result.array(), 0, result.limit(), 
StandardCharsets.UTF_8);
+        assertThat(decoded).isEqualTo(input);
+    }
+
+    private static Stream<Arguments> provideSpecialStrings() {
+        return Stream.of(
+                Arguments.of("Hello\u0000World"), // Embedded null
+                Arguments.of("\n\r\t"), // Control characters
+                Arguments.of(" "), // Single space
+                Arguments.of("    "), // Multiple spaces
+                Arguments.of("①②③"), // Unicode numbers
+                Arguments.of("🌟🌞🌝"), // Emojis
+                Arguments.of("Hello\uD83D\uDE00World"), // Surrogate pairs
+                Arguments.of("\uFEFF"), // Byte Order Mark
+                Arguments.of("Hello\\World"), // Backslashes
+                Arguments.of("Hello\"World"), // Quotes
+                Arguments.of("Hello'World"), // Single quotes
+                Arguments.of("Hello\bWorld"), // Backspace
+                Arguments.of("Hello\fWorld") // Form feed
+                );
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideLongStrings")
+    void testLongStrings(String input) {
+        ByteBuffer result =
+                
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length()), input);
+        String decoded = new String(result.array(), 0, result.limit(), 
StandardCharsets.UTF_8);
+        assertThat(decoded).isEqualTo(input).hasSize(input.length());
+    }
+
+    private static Stream<Arguments> provideLongStrings() {
+        return Stream.of(
+                Arguments.of(createString(1)),
+                Arguments.of(createString(10)),
+                Arguments.of(createString(100)),
+                Arguments.of(createString(1000)),
+                Arguments.of(createString(10000)));
+    }
+
+    private static String createString(int length) {
+        StringBuilder sb = new StringBuilder(length);
+        for (int i = 0; i < length; i++) {
+            sb.append('a');
+        }
+        return sb.toString();
+    }
+
+    @Test
+    void testMultiByteCharacters() {
+        String[] inputs = {
+            "中文", // Chinese
+            "한글", // Korean
+            "日本語", // Japanese
+            "🌟", // Emoji (4 bytes)
+            "Café", // Latin-1 Supplement
+            "Привет", // Cyrillic
+            "שָׁלוֹם", // Hebrew with combining marks
+            "ᄀᄁᄂᄃᄄ", // Hangul Jamo
+            "बहुत बढ़िया", // Devanagari
+            "العربية" // Arabic
+        };
+
+        for (String input : inputs) {
+            ByteBuffer result =
+                    
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length() * 4), input);
+            String decoded = new String(result.array(), 0, result.limit(), 
StandardCharsets.UTF_8);
+            assertThat(decoded).isEqualTo(input);
+            assertThat(result.limit()).isGreaterThanOrEqualTo(input.length());
+        }
+    }
+
+    @Test
+    void testBufferProperties() {
+        String input = "Hello, World!";
+        ByteBuffer result =
+                
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length()), input);
+
+        assertThat(result.limit()).isEqualTo(result.array().length);
+        assertThat(containsTrailingZeros(result)).isFalse();
+    }
+
+    @Test
+    void testConcurrentAccess() throws InterruptedException {
+        int threadCount = 10;
+        Thread[] threads = new Thread[threadCount];
+        String[] inputs = new String[threadCount];
+        ByteBuffer[] results = new ByteBuffer[threadCount];
+
+        for (int i = 0; i < threadCount; i++) {
+            final int index = i;
+            inputs[index] = "Thread" + index;
+            threads[index] =
+                    new Thread(
+                            () -> {
+                                results[index] =
+                                        IcebergConversions.toByteBuffer(
+                                                
DataTypes.VARCHAR(inputs[index].length()),
+                                                inputs[index]);
+                            });
+            threads[index].start();
+        }
+
+        for (Thread thread : threads) {
+            thread.join();
+        }
+
+        for (int i = 0; i < threadCount; i++) {
+            String decoded =
+                    new String(results[i].array(), 0, results[i].limit(), 
StandardCharsets.UTF_8);
+            assertThat(decoded).isEqualTo(inputs[i]);
+        }
+    }
+
+    private boolean containsTrailingZeros(ByteBuffer buffer) {
+        byte[] array = buffer.array();
+        for (int i = buffer.limit(); i < array.length; i++) {
+            if (array[i] != 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+}

Reply via email to