This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5282a1196 [hive]Pass Table into Split to Avoid Loading from FileSystem 
when createRecordReader (#2927)
5282a1196 is described below

commit 5282a119695503dad04b7a6b50290a72adaf20b5
Author: wgcn <[email protected]>
AuthorDate: Fri Mar 1 13:39:18 2024 +0800

    [hive]Pass Table into Split to Avoid Loading from FileSystem when 
createRecordReader (#2927)
---
 .../paimon/hive/mapred/PaimonInputFormat.java      |  3 +-
 .../paimon/hive/mapred/PaimonInputSplit.java       | 44 ++++++++++++++++--
 .../paimon/hive/mapred/PaimonRecordReader.java     |  3 +-
 .../paimon/hive/utils/HiveSplitGenerator.java      |  5 +-
 .../paimon/hive/mapred/PaimonInputSplitTest.java   | 54 +++++++++++++++++++++-
 .../paimon/hive/mapred/PaimonRecordReaderTest.java |  2 +-
 6 files changed, 102 insertions(+), 9 deletions(-)

diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index a58472a11..81fced03f 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -48,8 +48,7 @@ public class PaimonInputFormat implements InputFormat<Void, 
RowDataContainer> {
     @Override
     public RecordReader<Void, RowDataContainer> getRecordReader(
             InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws 
IOException {
-        FileStoreTable table = createFileStoreTable(jobConf);
         PaimonInputSplit split = (PaimonInputSplit) inputSplit;
-        return createRecordReader(table, split, jobConf);
+        return createRecordReader(split, jobConf);
     }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
index 5b639808f..64cae0aef 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
@@ -21,7 +21,9 @@ package org.apache.paimon.hive.mapred;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataInputDeserializer;
 import org.apache.paimon.io.DataOutputSerializer;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.InstantiationUtil;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
@@ -41,12 +43,15 @@ public class PaimonInputSplit extends FileSplit {
     private String path;
     private DataSplit split;
 
+    private FileStoreTable table;
+
     // public no-argument constructor for deserialization
     public PaimonInputSplit() {}
 
-    public PaimonInputSplit(String path, DataSplit split) {
+    public PaimonInputSplit(String path, DataSplit split, FileStoreTable 
table) {
         this.path = path;
         this.split = split;
+        this.table = table;
     }
 
     public DataSplit split() {
@@ -73,6 +78,10 @@ public class PaimonInputSplit extends FileSplit {
         return ANYWHERE;
     }
 
+    public FileStoreTable getTable() {
+        return table;
+    }
+
     @Override
     public void write(DataOutput dataOutput) throws IOException {
         dataOutput.writeUTF(path);
@@ -80,6 +89,17 @@ public class PaimonInputSplit extends FileSplit {
         split.serialize(out);
         dataOutput.writeInt(out.length());
         dataOutput.write(out.getCopyOfBuffer());
+        writeFileStoreTable(dataOutput);
+    }
+
+    private void writeFileStoreTable(DataOutput dataOutput) throws IOException 
{
+        if (table == null) {
+            dataOutput.writeInt(0);
+        } else {
+            byte[] bytes = InstantiationUtil.serializeObject(table);
+            dataOutput.writeInt(bytes.length);
+            dataOutput.write(bytes);
+        }
     }
 
     @Override
@@ -89,6 +109,22 @@ public class PaimonInputSplit extends FileSplit {
         byte[] bytes = new byte[length];
         dataInput.readFully(bytes);
         split = DataSplit.deserialize(new DataInputDeserializer(bytes));
+        readFileStoreTable(dataInput);
+    }
+
+    private void readFileStoreTable(DataInput dataInput) throws IOException {
+        int length = dataInput.readInt();
+        if (length > 0) {
+            byte[] bytes = new byte[length];
+            dataInput.readFully(bytes);
+            try {
+                table =
+                        InstantiationUtil.deserializeObject(
+                                bytes, 
Thread.currentThread().getContextClassLoader());
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     @Override
@@ -105,11 +141,13 @@ public class PaimonInputSplit extends FileSplit {
             return false;
         }
         PaimonInputSplit that = (PaimonInputSplit) o;
-        return Objects.equals(path, that.path) && Objects.equals(split, 
that.split);
+        return Objects.equals(path, that.path)
+                && Objects.equals(split, that.split)
+                && Objects.equals(table, that.table);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(path, split);
+        return Objects.hash(path, split, table);
     }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
index 857e3e805..beb868c5b 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
@@ -174,7 +174,8 @@ public class PaimonRecordReader implements 
RecordReader<Void, RowDataContainer>
     }
 
     public static RecordReader<Void, RowDataContainer> createRecordReader(
-            FileStoreTable table, PaimonInputSplit split, JobConf jobConf) 
throws IOException {
+            PaimonInputSplit split, JobConf jobConf) throws IOException {
+        FileStoreTable table = split.getTable();
         ReadBuilder readBuilder = table.newReadBuilder();
         createPredicate(table.schema(), jobConf, 
true).ifPresent(readBuilder::withFilter);
         List<String> paimonColumns = table.schema().fieldNames();
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 32b1e4745..6dab4d4af 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -96,7 +96,10 @@ public class HiveSplitGenerator {
             scan.plan()
                     .splits()
                     .forEach(
-                            split -> splits.add(new PaimonInputSplit(location, 
(DataSplit) split)));
+                            split ->
+                                    splits.add(
+                                            new PaimonInputSplit(
+                                                    location, (DataSplit) 
split, table)));
         }
         return splits.toArray(new InputSplit[0]);
     }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
index 908fce7c1..7d152c444 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
@@ -19,8 +19,20 @@
 package org.apache.paimon.hive.mapred;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileTestDataGenerator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -29,8 +41,11 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
@@ -62,8 +77,12 @@ public class PaimonInputSplitTest {
                                         .map(d -> d.meta)
                                         .collect(Collectors.toList()))
                         .build();
-        PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), 
dataSplit);
+        PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), 
dataSplit, null);
 
+        assertPaimonInputSplitSerialization(split);
+    }
+
+    private void assertPaimonInputSplitSerialization(PaimonInputSplit split) 
throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream output = new DataOutputStream(baos);
         split.write(output);
@@ -75,4 +94,37 @@ public class PaimonInputSplitTest {
         actual.readFields(input);
         assertThat(actual).isEqualTo(split);
     }
+
+    @Test
+    public void testWriteAndReadWithTable() throws Exception {
+        Path path = new Path(tempDir.toString());
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
path);
+        schemaManager.createTable(
+                new Schema(
+                        RowType.of(VarCharType.STRING_TYPE).getFields(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        ""));
+
+        FileStoreTable fileStoreTable = 
FileStoreTableFactory.create(LocalFileIO.create(), path);
+        writeData(fileStoreTable);
+
+        DataSplit split = (DataSplit) 
fileStoreTable.newScan().plan().splits().get(0);
+
+        PaimonInputSplit paimonInputSplit =
+                new PaimonInputSplit(path.toString(), split, fileStoreTable);
+
+        assertPaimonInputSplitSerialization(paimonInputSplit);
+    }
+
+    private void writeData(FileStoreTable fileStoreTable) throws Exception {
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> tableWrite = fileStoreTable.newWrite(commitUser);
+        tableWrite.write(GenericRow.of(BinaryString.fromString("1111")));
+        TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
+        commit.commit(0, tableWrite.prepareCommit(true, 0));
+        tableWrite.close();
+        commit.close();
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
index 8597a84dc..97cdc89eb 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
@@ -155,7 +155,7 @@ public class PaimonRecordReaderTest {
                 List<String> originalColumns = ((FileStoreTable) 
table).schema().fieldNames();
                 return new PaimonRecordReader(
                         table.newReadBuilder(),
-                        new PaimonInputSplit(tempDir.toString(), dataSplit),
+                        new PaimonInputSplit(tempDir.toString(), dataSplit, 
(FileStoreTable) table),
                         originalColumns,
                         originalColumns,
                         selectedColumns,

Reply via email to