This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5282a1196 [hive]Pass Table into Split to Avoid Loading from FileSystem
when createRecordReader (#2927)
5282a1196 is described below
commit 5282a119695503dad04b7a6b50290a72adaf20b5
Author: wgcn <[email protected]>
AuthorDate: Fri Mar 1 13:39:18 2024 +0800
[hive]Pass Table into Split to Avoid Loading from FileSystem when
createRecordReader (#2927)
---
.../paimon/hive/mapred/PaimonInputFormat.java | 3 +-
.../paimon/hive/mapred/PaimonInputSplit.java | 44 ++++++++++++++++--
.../paimon/hive/mapred/PaimonRecordReader.java | 3 +-
.../paimon/hive/utils/HiveSplitGenerator.java | 5 +-
.../paimon/hive/mapred/PaimonInputSplitTest.java | 54 +++++++++++++++++++++-
.../paimon/hive/mapred/PaimonRecordReaderTest.java | 2 +-
6 files changed, 102 insertions(+), 9 deletions(-)
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index a58472a11..81fced03f 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -48,8 +48,7 @@ public class PaimonInputFormat implements InputFormat<Void,
RowDataContainer> {
@Override
public RecordReader<Void, RowDataContainer> getRecordReader(
InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws
IOException {
- FileStoreTable table = createFileStoreTable(jobConf);
PaimonInputSplit split = (PaimonInputSplit) inputSplit;
- return createRecordReader(table, split, jobConf);
+ return createRecordReader(split, jobConf);
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
index 5b639808f..64cae0aef 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
@@ -21,7 +21,9 @@ package org.apache.paimon.hive.mapred;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataOutputSerializer;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.InstantiationUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
@@ -41,12 +43,15 @@ public class PaimonInputSplit extends FileSplit {
private String path;
private DataSplit split;
+ private FileStoreTable table;
+
// public no-argument constructor for deserialization
public PaimonInputSplit() {}
- public PaimonInputSplit(String path, DataSplit split) {
+ public PaimonInputSplit(String path, DataSplit split, FileStoreTable
table) {
this.path = path;
this.split = split;
+ this.table = table;
}
public DataSplit split() {
@@ -73,6 +78,10 @@ public class PaimonInputSplit extends FileSplit {
return ANYWHERE;
}
+ public FileStoreTable getTable() {
+ return table;
+ }
+
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(path);
@@ -80,6 +89,17 @@ public class PaimonInputSplit extends FileSplit {
split.serialize(out);
dataOutput.writeInt(out.length());
dataOutput.write(out.getCopyOfBuffer());
+ writeFileStoreTable(dataOutput);
+ }
+
+ private void writeFileStoreTable(DataOutput dataOutput) throws IOException
{
+ if (table == null) {
+ dataOutput.writeInt(0);
+ } else {
+ byte[] bytes = InstantiationUtil.serializeObject(table);
+ dataOutput.writeInt(bytes.length);
+ dataOutput.write(bytes);
+ }
}
@Override
@@ -89,6 +109,22 @@ public class PaimonInputSplit extends FileSplit {
byte[] bytes = new byte[length];
dataInput.readFully(bytes);
split = DataSplit.deserialize(new DataInputDeserializer(bytes));
+ readFileStoreTable(dataInput);
+ }
+
+ private void readFileStoreTable(DataInput dataInput) throws IOException {
+ int length = dataInput.readInt();
+ if (length > 0) {
+ byte[] bytes = new byte[length];
+ dataInput.readFully(bytes);
+ try {
+ table =
+ InstantiationUtil.deserializeObject(
+ bytes,
Thread.currentThread().getContextClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
@Override
@@ -105,11 +141,13 @@ public class PaimonInputSplit extends FileSplit {
return false;
}
PaimonInputSplit that = (PaimonInputSplit) o;
- return Objects.equals(path, that.path) && Objects.equals(split,
that.split);
+ return Objects.equals(path, that.path)
+ && Objects.equals(split, that.split)
+ && Objects.equals(table, that.table);
}
@Override
public int hashCode() {
- return Objects.hash(path, split);
+ return Objects.hash(path, split, table);
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
index 857e3e805..beb868c5b 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
@@ -174,7 +174,8 @@ public class PaimonRecordReader implements
RecordReader<Void, RowDataContainer>
}
public static RecordReader<Void, RowDataContainer> createRecordReader(
- FileStoreTable table, PaimonInputSplit split, JobConf jobConf)
throws IOException {
+ PaimonInputSplit split, JobConf jobConf) throws IOException {
+ FileStoreTable table = split.getTable();
ReadBuilder readBuilder = table.newReadBuilder();
createPredicate(table.schema(), jobConf,
true).ifPresent(readBuilder::withFilter);
List<String> paimonColumns = table.schema().fieldNames();
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 32b1e4745..6dab4d4af 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -96,7 +96,10 @@ public class HiveSplitGenerator {
scan.plan()
.splits()
.forEach(
- split -> splits.add(new PaimonInputSplit(location,
(DataSplit) split)));
+ split ->
+ splits.add(
+ new PaimonInputSplit(
+ location, (DataSplit)
split, table)));
}
return splits.toArray(new InputSplit[0]);
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
index 908fce7c1..7d152c444 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
@@ -19,8 +19,20 @@
package org.apache.paimon.hive.mapred;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileTestDataGenerator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -29,8 +41,11 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -62,8 +77,12 @@ public class PaimonInputSplitTest {
.map(d -> d.meta)
.collect(Collectors.toList()))
.build();
- PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(),
dataSplit);
+ PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(),
dataSplit, null);
+ assertPaimonInputSplitSerialization(split);
+ }
+
+ private void assertPaimonInputSplitSerialization(PaimonInputSplit split)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
split.write(output);
@@ -75,4 +94,37 @@ public class PaimonInputSplitTest {
actual.readFields(input);
assertThat(actual).isEqualTo(split);
}
+
+ @Test
+ public void testWriteAndReadWithTable() throws Exception {
+ Path path = new Path(tempDir.toString());
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
path);
+ schemaManager.createTable(
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE).getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ ""));
+
+ FileStoreTable fileStoreTable =
FileStoreTableFactory.create(LocalFileIO.create(), path);
+ writeData(fileStoreTable);
+
+ DataSplit split = (DataSplit)
fileStoreTable.newScan().plan().splits().get(0);
+
+ PaimonInputSplit paimonInputSplit =
+ new PaimonInputSplit(path.toString(), split, fileStoreTable);
+
+ assertPaimonInputSplitSerialization(paimonInputSplit);
+ }
+
+ private void writeData(FileStoreTable fileStoreTable) throws Exception {
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> tableWrite = fileStoreTable.newWrite(commitUser);
+ tableWrite.write(GenericRow.of(BinaryString.fromString("1111")));
+ TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
+ commit.commit(0, tableWrite.prepareCommit(true, 0));
+ tableWrite.close();
+ commit.close();
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
index 8597a84dc..97cdc89eb 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
@@ -155,7 +155,7 @@ public class PaimonRecordReaderTest {
List<String> originalColumns = ((FileStoreTable)
table).schema().fieldNames();
return new PaimonRecordReader(
table.newReadBuilder(),
- new PaimonInputSplit(tempDir.toString(), dataSplit),
+ new PaimonInputSplit(tempDir.toString(), dataSplit,
(FileStoreTable) table),
originalColumns,
originalColumns,
selectedColumns,