This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 632a54728 [core] Improve Avro read write performance (#1501)
632a54728 is described below
commit 632a5472877b4803085371f905a2437f89d60845
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 5 20:20:40 2023 +0800
[core] Improve Avro read write performance (#1501)
---
.../paimon/benchmark/TableWriterBenchmark.java | 17 +-
.../apache/paimon/format/TableStatsCollector.java | 10 +
.../apache/paimon/format/FormatReadWriteTest.java | 179 +++++++
.../paimon/io/StatsCollectingSingleFileWriter.java | 2 +-
...ractAvroBulkFormat.java => AvroBulkFormat.java} | 104 ++---
.../apache/paimon/format/avro/AvroFileFormat.java | 65 +--
.../paimon/format/avro/AvroRowDatumReader.java | 68 +++
.../paimon/format/avro/AvroRowDatumWriter.java | 61 +++
.../paimon/format/avro/AvroSchemaVisitor.java | 142 ++++++
.../org/apache/paimon/format/avro/FieldReader.java | 31 ++
.../paimon/format/avro/FieldReaderFactory.java | 514 +++++++++++++++++++++
.../org/apache/paimon/format/avro/FieldWriter.java | 31 ++
.../paimon/format/avro/FieldWriterFactory.java | 226 +++++++++
.../format/avro/RowDataToAvroConverters.java | 329 -------------
.../paimon/format/avro/AvroBulkFormatTest.java | 160 -------
.../format/avro/AvroBulkFormatTestUtils.java | 76 ---
.../format/avro/AvroFormatReadWriteTest.java | 32 ++
.../format/avro/AvroToRowDataConvertersTest.java | 47 --
.../format/avro/RowDataToAvroConvertersTest.java | 57 ---
19 files changed, 1350 insertions(+), 801 deletions(-)
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
index 85340f194..2594420ca 100644
---
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
@@ -43,7 +43,22 @@ public class TableWriterBenchmark extends TableBenchmark {
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
* avro: Best/Avg Time(ms) Row Rate(M/s) Per
Row(ns) Relative
*
---------------------------------------------------------------------------------
- * avro_write 10139 / 13044 0.0 33797.3
1.0X
+ * avro_write 5847 / 7296 0.1 19489.5
1.0X
+ */
+ }
+
+ @Test
+ public void testAvroWithoutStats() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ options.set(CoreOptions.METADATA_STATS_MODE, "none");
+ innerTest("avro", options);
+ /*
+ * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
+ * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
+ * avro: Best/Avg Time(ms) Row Rate(M/s) Per
Row(ns) Relative
+ *
---------------------------------------------------------------------------------
+ * avro_write 4701 / 5780 0.1 15669.6
1.0X
*/
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
index 6f8661815..25f00d743 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
@@ -22,9 +22,12 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.NoneFieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import java.util.Arrays;
+
import static
org.apache.paimon.statistics.FieldStatsCollector.createFullStatsFactories;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -34,6 +37,7 @@ public class TableStatsCollector {
private final RowDataToObjectArrayConverter converter;
private final FieldStatsCollector[] statsCollectors;
private final Serializer<Object>[] fieldSerializers;
+ private final boolean isDisabled;
public TableStatsCollector(RowType rowType) {
this(rowType, createFullStatsFactories(rowType.getFieldCount()));
@@ -52,6 +56,12 @@ public class TableStatsCollector {
for (int i = 0; i < numFields; i++) {
fieldSerializers[i] =
InternalSerializers.create(rowType.getTypeAt(i));
}
+ this.isDisabled =
+ Arrays.stream(statsCollectors).allMatch(p -> p instanceof
NoneFieldStatsCollector);
+ }
+
+ public boolean isDisabled() {
+ return isDisabled;
}
/**
diff --git
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
new file mode 100644
index 000000000..cd2fd36da
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.data.BinaryString.fromString;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test Base class for Format. */
+public abstract class FormatReadWriteTest {
+
+ @TempDir java.nio.file.Path tempPath;
+
+ private FileIO fileIO;
+ private Path file;
+
+ @BeforeEach
+ public void beforeEach() {
+ this.fileIO = LocalFileIO.create();
+ this.file = new Path(new Path(tempPath.toUri()),
UUID.randomUUID().toString());
+ }
+
+ protected abstract FileFormat fileFormat();
+
+ @Test
+ public void testSimpleTypes() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(),
DataTypes.BIGINT());
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ rowType = (RowType) rowType.notNull();
+ }
+
+ InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+ FileFormat format = fileFormat();
+
+ PositionOutputStream out = fileIO.newOutputStream(file, false);
+ FormatWriter writer = format.createWriterFactory(rowType).create(out,
null);
+ writer.addElement(GenericRow.of(1, 1L));
+ writer.addElement(GenericRow.of(2, 2L));
+ writer.addElement(GenericRow.of(3, null));
+ writer.flush();
+ writer.finish();
+ out.close();
+
+ RecordReader<InternalRow> reader =
+ format.createReaderFactory(rowType).createReader(fileIO, file);
+ List<InternalRow> result = new ArrayList<>();
+ reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+
+ assertThat(result)
+ .containsExactly(
+ GenericRow.of(1, 1L), GenericRow.of(2, 2L),
GenericRow.of(3, null));
+ }
+
+ @Test
+ public void testFullTypes() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT().notNull())
+ .field("name", DataTypes.STRING()) /* optional by
default */
+ .field("salary", DataTypes.DOUBLE().notNull())
+ .field(
+ "locations",
+ DataTypes.MAP(
+ DataTypes.STRING().notNull(),
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ 0,
+ "posX",
+
DataTypes.DOUBLE().notNull(),
+ "X field"),
+ DataTypes.FIELD(
+ 1,
+ "posY",
+
DataTypes.DOUBLE().notNull(),
+ "Y field"))))
+ .field("strArray",
DataTypes.ARRAY(DataTypes.STRING()).nullable())
+ .field("intArray",
DataTypes.ARRAY(DataTypes.INT()).nullable())
+ .field("boolean", DataTypes.BOOLEAN().nullable())
+ .field("tinyint", DataTypes.TINYINT())
+ .field("smallint", DataTypes.SMALLINT())
+ .field("bigint", DataTypes.BIGINT())
+ .field("bytes", DataTypes.BYTES())
+ .field("timestamp", DataTypes.TIMESTAMP())
+ .field("timestamp_3", DataTypes.TIMESTAMP(3))
+ .field("date", DataTypes.DATE())
+ .field("decimal", DataTypes.DECIMAL(2, 2))
+ .field("decimal2", DataTypes.DECIMAL(38, 2))
+ .field("decimal3", DataTypes.DECIMAL(10, 1))
+ .build();
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ rowType = (RowType) rowType.notNull();
+ }
+
+ FileFormat format = fileFormat();
+
+ PositionOutputStream out = fileIO.newOutputStream(file, false);
+ FormatWriter writer = format.createWriterFactory(rowType).create(out,
null);
+ GenericRow expected =
+ GenericRow.of(
+ 1,
+ fromString("name"),
+ 5.26D,
+ new GenericMap(
+ new HashMap<Object, Object>() {
+ {
+ this.put(fromString("key1"),
GenericRow.of(5.2D, 6.2D));
+ this.put(fromString("key2"),
GenericRow.of(6.2D, 2.2D));
+ }
+ }),
+ new GenericArray(new Object[] {fromString("123"),
fromString("456")}),
+ new GenericArray(new Object[] {123, 456}),
+ true,
+ (byte) 3,
+ (short) 6,
+ 12304L,
+ new byte[] {1, 5, 2},
+ Timestamp.fromMicros(123123123),
+ Timestamp.fromEpochMillis(123123123),
+ 2456,
+ Decimal.fromBigDecimal(new BigDecimal(0.22), 2, 2),
+ Decimal.fromBigDecimal(new BigDecimal(12312455.22),
38, 2),
+ Decimal.fromBigDecimal(new BigDecimal(12455.1), 10,
1));
+ writer.addElement(expected);
+ writer.flush();
+ writer.finish();
+ out.close();
+
+ RecordReader<InternalRow> reader =
+ format.createReaderFactory(rowType).createReader(fileIO, file);
+ List<InternalRow> result = new ArrayList<>();
+ reader.forEachRemaining(result::add);
+
+ assertThat(result).containsExactly(expected);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index 559b0f181..a5de36634 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -68,7 +68,7 @@ public abstract class StatsCollectingSingleFileWriter<T, R>
extends SingleFileWr
@Override
public void write(T record) throws IOException {
InternalRow rowData = writeImpl(record);
- if (tableStatsCollector != null) {
+ if (tableStatsCollector != null && !tableStatsCollector.isDisabled()) {
tableStatsCollector.collect(rowData);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
similarity index 56%
rename from
paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
rename to
paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index 2ce1f7c18..0f5867e64 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -23,95 +23,62 @@ import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.IteratorResultIterator;
import org.apache.paimon.utils.Pool;
-import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
-import java.util.function.Function;
/** Provides a {@link FormatReaderFactory} for Avro records. */
-public abstract class AbstractAvroBulkFormat<A> implements FormatReaderFactory
{
+public class AvroBulkFormat implements FormatReaderFactory {
private static final long serialVersionUID = 1L;
- protected final Schema readerSchema;
+ protected final RowType rowType;
+ private final int[] projection;
- protected AbstractAvroBulkFormat(Schema readerSchema) {
- this.readerSchema = readerSchema;
+ public AvroBulkFormat(RowType rowType, int[] projection) {
+ this.rowType = rowType;
+ this.projection = projection;
}
@Override
public AvroReader createReader(FileIO fileIO, Path file) throws
IOException {
- return createReader(fileIO, file, createReusedAvroRecord(),
createConverter());
+ return new AvroReader(fileIO, file);
}
- private AvroReader createReader(
- FileIO fileIO, Path file, A reuse, Function<A, InternalRow>
converter)
- throws IOException {
- return new AvroReader(fileIO, file, 0, fileIO.getFileSize(file), -1,
0, reuse, converter);
- }
-
- protected abstract A createReusedAvroRecord();
-
- protected abstract Function<A, InternalRow> createConverter();
-
private class AvroReader implements RecordReader<InternalRow> {
private final FileIO fileIO;
- private final DataFileReader<A> reader;
- private final Function<A, InternalRow> converter;
+ private final DataFileReader<InternalRow> reader;
private final long end;
- private final Pool<A> pool;
-
- private long currentRecordsToSkip;
-
- private AvroReader(
- FileIO fileIO,
- Path path,
- long offset,
- long end,
- long blockStart,
- long recordsToSkip,
- A reuse,
- Function<A, InternalRow> converter)
- throws IOException {
+ private final Pool<Object> pool;
+
+ private AvroReader(FileIO fileIO, Path path) throws IOException {
this.fileIO = fileIO;
this.reader = createReaderFromPath(path);
- if (blockStart >= 0) {
- reader.seek(blockStart);
- } else {
- reader.sync(offset);
- }
- for (int i = 0; i < recordsToSkip; i++) {
- reader.next(reuse);
- }
- this.converter = converter;
-
- this.end = end;
+ this.reader.sync(0);
+ this.end = fileIO.getFileSize(path);
this.pool = new Pool<>(1);
- this.pool.add(reuse);
-
- this.currentRecordsToSkip = recordsToSkip;
+ this.pool.add(new Object());
}
- private DataFileReader<A> createReaderFromPath(Path path) throws
IOException {
- DatumReader<A> datumReader = new GenericDatumReader<>(null,
readerSchema);
+ private DataFileReader<InternalRow> createReaderFromPath(Path path)
throws IOException {
+ DatumReader<InternalRow> datumReader = new
AvroRowDatumReader(rowType, projection);
SeekableInput in =
new SeekableInputStreamWrapper(
fileIO.newInputStream(path),
fileIO.getFileSize(path));
try {
- return (DataFileReader<A>) DataFileReader.openReader(in,
datumReader);
+ return (DataFileReader<InternalRow>)
DataFileReader.openReader(in, datumReader);
} catch (Throwable e) {
IOUtils.closeQuietly(in);
throw e;
@@ -121,9 +88,9 @@ public abstract class AbstractAvroBulkFormat<A> implements
FormatReaderFactory {
@Nullable
@Override
public RecordIterator<InternalRow> readBatch() throws IOException {
- A reuse;
+ Object ticket;
try {
- reuse = pool.pollEntry();
+ ticket = pool.pollEntry();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
@@ -131,18 +98,12 @@ public abstract class AbstractAvroBulkFormat<A> implements
FormatReaderFactory {
}
if (!readNextBlock()) {
- pool.recycler().recycle(reuse);
+ pool.recycler().recycle(ticket);
return null;
}
- Iterator<InternalRow> iterator =
- new AvroBlockIterator(
- reader.getBlockCount() - currentRecordsToSkip,
- reader,
- reuse,
- converter);
- currentRecordsToSkip = 0;
- return new IteratorResultIterator<>(iterator, () ->
pool.recycler().recycle(reuse));
+ Iterator<InternalRow> iterator = new
AvroBlockIterator(reader.getBlockCount(), reader);
+ return new IteratorResultIterator<>(iterator, () ->
pool.recycler().recycle(ticket));
}
private boolean readNextBlock() throws IOException {
@@ -157,22 +118,14 @@ public abstract class AbstractAvroBulkFormat<A>
implements FormatReaderFactory {
}
}
- private class AvroBlockIterator implements Iterator<InternalRow> {
+ private static class AvroBlockIterator implements Iterator<InternalRow> {
private long numRecordsRemaining;
- private final DataFileReader<A> reader;
- private final A reuse;
- private final Function<A, InternalRow> converter;
-
- private AvroBlockIterator(
- long numRecordsRemaining,
- DataFileReader<A> reader,
- A reuse,
- Function<A, InternalRow> converter) {
+ private final DataFileReader<InternalRow> reader;
+
+ private AvroBlockIterator(long numRecordsRemaining,
DataFileReader<InternalRow> reader) {
this.numRecordsRemaining = numRecordsRemaining;
this.reader = reader;
- this.reuse = reuse;
- this.converter = converter;
}
@Override
@@ -186,7 +139,8 @@ public abstract class AbstractAvroBulkFormat<A> implements
FormatReaderFactory {
numRecordsRemaining--;
// reader.next merely deserialize bytes in memory to java
objects
// and will not read from file
- return converter.apply(reader.next(reuse));
+ // Do not reuse object, manifest file assumes no object reuse
+ return reader.next(null);
} catch (IOException e) {
throw new RuntimeException(
"Encountered exception when reading from avro format
file", e);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index 11d4d3713..fffbaec9b 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -18,7 +18,6 @@
package org.apache.paimon.format.avro;
-import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
@@ -36,16 +35,11 @@ import org.apache.paimon.utils.Projection;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
-import java.util.function.Function;
import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
@@ -70,17 +64,12 @@ public class AvroFileFormat extends FileFormat {
@Override
public FormatReaderFactory createReaderFactory(
RowType type, int[][] projection, @Nullable List<Predicate>
filters) {
- // avro is a file format that keeps schemas in file headers,
- // if the schema given to the reader is not equal to the schema in
header,
- // reader will automatically map the fields and give back records with
our desired
- // schema
- DataType producedType = Projection.of(projection).project(type);
- return new AvroGenericRecordBulkFormat((RowType)
producedType.copy(false));
+ return new AvroBulkFormat(type,
Projection.of(projection).toTopLevelIndexes());
}
@Override
public FormatWriterFactory createWriterFactory(RowType type) {
- return new RowDataAvroWriterFactory(type,
formatOptions.get(AVRO_OUTPUT_CODEC));
+ return new RowAvroWriterFactory(type,
formatOptions.get(AVRO_OUTPUT_CODEC));
}
@Override
@@ -91,48 +80,18 @@ public class AvroFileFormat extends FileFormat {
}
}
- private static class AvroGenericRecordBulkFormat extends
AbstractAvroBulkFormat<GenericRecord> {
+ /** A {@link FormatWriterFactory} to write {@link InternalRow}. */
+ private static class RowAvroWriterFactory implements FormatWriterFactory {
- private static final long serialVersionUID = 1L;
+ private final AvroWriterFactory<InternalRow> factory;
- private final RowType producedRowType;
-
- public AvroGenericRecordBulkFormat(RowType producedRowType) {
- super(AvroSchemaConverter.convertToSchema(producedRowType));
- this.producedRowType = producedRowType;
- }
-
- @Override
- protected GenericRecord createReusedAvroRecord() {
- return new GenericData.Record(readerSchema);
- }
-
- @Override
- protected Function<GenericRecord, InternalRow> createConverter() {
- AvroToRowDataConverters.AvroToRowDataConverter converter =
-
AvroToRowDataConverters.createRowConverter(producedRowType);
- return record -> record == null ? null : (GenericRow)
converter.convert(record);
- }
- }
-
- /**
- * A {@link FormatWriterFactory} to convert {@link InternalRow} to {@link
GenericRecord} and
- * wrap {@link AvroWriterFactory}.
- */
- private static class RowDataAvroWriterFactory implements
FormatWriterFactory {
-
- private final AvroWriterFactory<GenericRecord> factory;
- private final RowType rowType;
-
- private RowDataAvroWriterFactory(RowType rowType, String codec) {
- this.rowType = rowType;
+ private RowAvroWriterFactory(RowType rowType, String codec) {
this.factory =
new AvroWriterFactory<>(
out -> {
Schema schema =
AvroSchemaConverter.convertToSchema(rowType);
- DatumWriter<GenericRecord> datumWriter =
- new GenericDatumWriter<>(schema);
- DataFileWriter<GenericRecord> dataFileWriter =
+ AvroRowDatumWriter datumWriter = new
AvroRowDatumWriter(rowType);
+ DataFileWriter<InternalRow> dataFileWriter =
new DataFileWriter<>(datumWriter);
if (codec != null) {
@@ -146,16 +105,12 @@ public class AvroFileFormat extends FileFormat {
@Override
public FormatWriter create(PositionOutputStream out, String
compression)
throws IOException {
- AvroBulkWriter<GenericRecord> writer = factory.create(out);
- RowDataToAvroConverters.RowDataToAvroConverter converter =
- RowDataToAvroConverters.createConverter(rowType);
- Schema schema = AvroSchemaConverter.convertToSchema(rowType);
+ AvroBulkWriter<InternalRow> writer = factory.create(out);
return new FormatWriter() {
@Override
public void addElement(InternalRow element) throws IOException
{
- GenericRecord record = (GenericRecord)
converter.convert(schema, element);
- writer.addElement(record);
+ writer.addElement(element);
}
@Override
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java
new file mode 100644
index 000000000..db5cc8b12
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.avro.FieldReaderFactory.RowReader;
+import org.apache.paimon.types.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+
+import java.io.IOException;
+
+/** A {@link DatumReader} for reading {@link InternalRow}. */
+public class AvroRowDatumReader implements DatumReader<InternalRow> {
+
+ private final RowType rowType;
+ private final int[] projection;
+
+ private RowReader reader;
+ private boolean isUnion;
+
+ public AvroRowDatumReader(RowType rowType, int[] projection) {
+ this.rowType = rowType;
+ this.projection = projection;
+ }
+
+ @Override
+ public void setSchema(Schema schema) {
+ this.isUnion = false;
+ if (schema.isUnion()) {
+ this.isUnion = true;
+ schema = schema.getTypes().get(1);
+ }
+ this.reader =
+ new FieldReaderFactory()
+ .createRowReader(schema, rowType.getFieldTypes(),
projection);
+ }
+
+ @Override
+ public InternalRow read(InternalRow reuse, Decoder in) throws IOException {
+ if (isUnion) {
+ int index = in.readIndex();
+ if (index == 0) {
+ throw new RuntimeException("Cannot read a null row.");
+ }
+ }
+
+ return reader.read(in, reuse);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
new file mode 100644
index 000000000..ef6643bd1
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.avro.FieldWriterFactory.RowWriter;
+import org.apache.paimon.types.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+
+import java.io.IOException;
+
+/** A {@link DatumWriter} for writing {@link InternalRow}. */
+public class AvroRowDatumWriter implements DatumWriter<InternalRow> {
+
+ private final RowType rowType;
+
+ private RowWriter writer;
+ private boolean isUnion;
+
+ public AvroRowDatumWriter(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void setSchema(Schema schema) {
+ this.isUnion = false;
+ if (schema.isUnion()) {
+ this.isUnion = true;
+ schema = schema.getTypes().get(1);
+ }
+ this.writer = new FieldWriterFactory().createRowWriter(schema,
rowType.getFieldTypes());
+ }
+
+ @Override
+ public void write(InternalRow datum, Encoder out) throws IOException {
+ if (isUnion) {
+ // top Row is a UNION type
+ out.writeIndex(1);
+ }
+ this.writer.writeRow(datum, out);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaVisitor.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaVisitor.java
new file mode 100644
index 000000000..e30800ab9
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaVisitor.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.util.List;
+
+import static org.apache.paimon.types.DataTypeChecks.getPrecision;
+
+/** Visitor to visit {@link Schema}. */
+public interface AvroSchemaVisitor<T> {
+
+ default T visit(Schema schema, DataType type) {
+ switch (schema.getType()) {
+ case RECORD:
+ return visitRecord(schema, ((RowType) type).getFieldTypes());
+
+ case UNION:
+ return visitUnion(schema, type);
+
+ case ARRAY:
+ return visitArray(schema, ((ArrayType) type).getElementType());
+
+ case MAP:
+ DataType valueType = DataTypes.INT();
+ if (type instanceof MapType) {
+ valueType = ((MapType) type).getValueType();
+ }
+ return visitMap(schema, valueType);
+
+ default:
+ return primitive(schema, type);
+ }
+ }
+
+ default T primitive(Schema primitive, DataType type) {
+ LogicalType logicalType = primitive.getLogicalType();
+ if (logicalType != null) {
+ switch (logicalType.getName()) {
+ case "date":
+ case "time-millis":
+ return visitInt();
+
+ case "timestamp-millis":
+ return visitTimestampMillis(getPrecision(type));
+
+ case "timestamp-micros":
+ return visitTimestampMicros(getPrecision(type));
+
+ case "decimal":
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal)
logicalType;
+ return visitDecimal(decimal.getPrecision(),
decimal.getScale());
+
+ default:
+ throw new IllegalArgumentException("Unknown logical type:
" + logicalType);
+ }
+ }
+
+ switch (primitive.getType()) {
+ case BOOLEAN:
+ return visitBoolean();
+ case INT:
+ switch (type.getTypeRoot()) {
+ case TINYINT:
+ return visitTinyInt();
+ case SMALLINT:
+ return visitSmallInt();
+ default:
+ return visitInt();
+ }
+ case LONG:
+ return visitBigInt();
+ case FLOAT:
+ return visitFloat();
+ case DOUBLE:
+ return visitDouble();
+ case STRING:
+ return visitString();
+ case BYTES:
+ return visitBytes();
+ default:
+ throw new IllegalArgumentException("Unsupported type: " +
primitive);
+ }
+ }
+
+ T visitUnion(Schema schema, DataType type);
+
+ T visitString();
+
+ T visitBytes();
+
+ T visitInt();
+
+ T visitTinyInt();
+
+ T visitSmallInt();
+
+ T visitBoolean();
+
+ T visitBigInt();
+
+ T visitFloat();
+
+ T visitDouble();
+
+ T visitTimestampMillis(int precision);
+
+ T visitTimestampMicros(int precision);
+
+ T visitDecimal(int precision, int scale);
+
+ T visitArray(Schema schema, DataType elementType);
+
+ T visitMap(Schema schema, DataType valueType);
+
+ T visitRecord(Schema schema, List<DataType> fieldTypes);
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReader.java
new file mode 100644
index 000000000..37402b835
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.avro.io.Decoder;
+
+import java.io.IOException;
+
+/** Reader to read field from Avro {@link Decoder}. */
+public interface FieldReader {
+
+ Object read(Decoder decoder, Object reuse) throws IOException;
+
+ void skip(Decoder decoder) throws IOException;
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
new file mode 100644
index 000000000..dfee0d8da
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/** Factory to create {@link FieldReader}. */
+public class FieldReaderFactory implements AvroSchemaVisitor<FieldReader> {
+
+ private static final FieldReader STRING_READER = new StringReader();
+
+ private static final FieldReader BYTES_READER = new BytesReader();
+
+ private static final FieldReader BOOLEAN_READER = new BooleanReader();
+
+ private static final FieldReader TINYINT_READER = new TinyIntReader();
+
+ private static final FieldReader SMALLINT_READER = new SmallIntReader();
+
+ private static final FieldReader INT_READER = new IntReader();
+
+ private static final FieldReader BIGINT_READER = new BigIntReader();
+
+ private static final FieldReader FLOAT_READER = new FloatReader();
+
+ private static final FieldReader DOUBLE_READER = new DoubleReader();
+
+ private static final FieldReader TIMESTAMP_MILLS_READER = new
TimestampMillsReader();
+
+ private static final FieldReader TIMESTAMP_MICROS_READER = new
TimestampMicrosReader();
+
+ @Override
+ public FieldReader visitUnion(Schema schema, DataType type) {
+ return new NullableReader(visit(schema.getTypes().get(1), type));
+ }
+
+ @Override
+ public FieldReader visitString() {
+ return STRING_READER;
+ }
+
+ @Override
+ public FieldReader visitBytes() {
+ return BYTES_READER;
+ }
+
+ @Override
+ public FieldReader visitInt() {
+ return INT_READER;
+ }
+
+ @Override
+ public FieldReader visitTinyInt() {
+ return TINYINT_READER;
+ }
+
+ @Override
+ public FieldReader visitSmallInt() {
+ return SMALLINT_READER;
+ }
+
+ @Override
+ public FieldReader visitBoolean() {
+ return BOOLEAN_READER;
+ }
+
+ @Override
+ public FieldReader visitBigInt() {
+ return BIGINT_READER;
+ }
+
+ @Override
+ public FieldReader visitFloat() {
+ return FLOAT_READER;
+ }
+
+ @Override
+ public FieldReader visitDouble() {
+ return DOUBLE_READER;
+ }
+
+ @Override
+ public FieldReader visitTimestampMillis(int precision) {
+ return TIMESTAMP_MILLS_READER;
+ }
+
+ @Override
+ public FieldReader visitTimestampMicros(int precision) {
+ return TIMESTAMP_MICROS_READER;
+ }
+
+ @Override
+ public FieldReader visitDecimal(int precision, int scale) {
+ return new DecimalReader(precision, scale);
+ }
+
+ @Override
+ public FieldReader visitArray(Schema schema, DataType elementType) {
+ FieldReader elementReader = visit(schema.getElementType(),
elementType);
+ return new ArrayReader(elementReader);
+ }
+
+ @Override
+ public FieldReader visitMap(Schema schema, DataType valueType) {
+ FieldReader valueReader = visit(schema.getValueType(), valueType);
+ return new MapReader(valueReader);
+ }
+
+ @Override
+ public FieldReader visitRecord(Schema schema, List<DataType> fieldTypes) {
+ return new RowReader(schema, fieldTypes);
+ }
+
+ private static class NullableReader implements FieldReader {
+
+ private final FieldReader reader;
+
+ public NullableReader(FieldReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ int index = decoder.readIndex();
+ return index == 0 ? null : reader.read(decoder, reuse);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ int index = decoder.readIndex();
+ if (index == 1) {
+ reader.skip(decoder);
+ }
+ }
+ }
+
+ private static class StringReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ Utf8 utf8 = null;
+ if (reuse instanceof BinaryString) {
+ utf8 = new Utf8(((BinaryString) reuse).toBytes());
+ }
+
+ Utf8 string = decoder.readString(utf8);
+ return BinaryString.fromBytes(string.getBytes(), 0,
string.getByteLength());
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipString();
+ }
+ }
+
+ private static class BytesReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ return decoder.readBytes(null).array();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipBytes();
+ }
+ }
+
+ private static class BooleanReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ return decoder.readBoolean();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readBoolean();
+ }
+ }
+
+ private static class TinyIntReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ return (byte) decoder.readInt();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readInt();
+ }
+ }
+
+ private static class SmallIntReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ return (short) decoder.readInt();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readInt();
+ }
+ }
+
+ private static class IntReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ return decoder.readInt();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readInt();
+ }
+ }
+
+ private static class BigIntReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ return decoder.readLong();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readLong();
+ }
+ }
+
+ private static class FloatReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ return decoder.readFloat();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readFloat();
+ }
+ }
+
+ private static class DoubleReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ return decoder.readDouble();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readDouble();
+ }
+ }
+
+ private static class DecimalReader implements FieldReader {
+
+ private final int precision;
+ private final int scale;
+
+ private DecimalReader(int precision, int scale) {
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ byte[] bytes = (byte[]) BYTES_READER.read(decoder, null);
+ return Decimal.fromBigDecimal(
+ new BigDecimal(new BigInteger(bytes), scale), precision,
scale);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ BYTES_READER.skip(decoder);
+ }
+ }
+
+ private static class TimestampMillsReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ return Timestamp.fromEpochMillis(decoder.readLong());
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readLong();
+ }
+ }
+
+ private static class TimestampMicrosReader implements FieldReader {
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ return Timestamp.fromMicros(decoder.readLong());
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readLong();
+ }
+ }
+
+ private static class ArrayReader implements FieldReader {
+
+ private final FieldReader elementReader;
+ private final List<Object> reusedList = new ArrayList<>();
+
+ private ArrayReader(FieldReader elementReader) {
+ this.elementReader = elementReader;
+ }
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ reusedList.clear();
+ long chunkLength = decoder.readArrayStart();
+
+ while (chunkLength > 0) {
+ for (int i = 0; i < chunkLength; i += 1) {
+ reusedList.add(elementReader.read(decoder, null));
+ }
+
+ chunkLength = decoder.arrayNext();
+ }
+
+ return new GenericArray(reusedList.toArray());
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ long chunkLength = decoder.readArrayStart();
+
+ while (chunkLength > 0) {
+ for (int i = 0; i < chunkLength; i += 1) {
+ elementReader.skip(decoder);
+ }
+
+ chunkLength = decoder.arrayNext();
+ }
+ }
+ }
+
+ private static class MapReader implements FieldReader {
+
+ private final FieldReader valueReader;
+ private final List<Object> reusedKeyList = new ArrayList<>();
+ private final List<Object> reusedValueList = new ArrayList<>();
+
+ private MapReader(FieldReader valueReader) {
+ this.valueReader = valueReader;
+ }
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ reusedKeyList.clear();
+ reusedValueList.clear();
+
+ long chunkLength = decoder.readMapStart();
+
+ while (chunkLength > 0) {
+ for (int i = 0; i < chunkLength; i += 1) {
+ reusedKeyList.add(STRING_READER.read(decoder, null));
+ reusedValueList.add(valueReader.read(decoder, null));
+ }
+
+ chunkLength = decoder.mapNext();
+ }
+
+ Map<Object, Object> map = new HashMap<>();
+ Object[] keys = reusedKeyList.toArray();
+ Object[] values = reusedValueList.toArray();
+ for (int i = 0; i < keys.length; i++) {
+ map.put(keys[i], values[i]);
+ }
+
+ return new GenericMap(map);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ long chunkLength = decoder.readMapStart();
+
+ while (chunkLength > 0) {
+ for (int i = 0; i < chunkLength; i += 1) {
+ STRING_READER.skip(decoder);
+ valueReader.skip(decoder);
+ }
+
+ chunkLength = decoder.mapNext();
+ }
+ }
+ }
+
+ public RowReader createRowReader(Schema schema, List<DataType> fieldTypes,
int[] projection) {
+ return new RowReader(schema, fieldTypes, projection);
+ }
+
+ /** A {@link FieldReader} to read {@link InternalRow}. */
+ public class RowReader implements FieldReader {
+
+ private final FieldReader[] fieldReaders;
+ private final int[] projection;
+ private final int[][] mapping;
+
+ public RowReader(Schema schema, List<DataType> fieldTypes) {
+ this(schema, fieldTypes, IntStream.range(0,
fieldTypes.size()).toArray());
+ }
+
+ public RowReader(Schema schema, List<DataType> fieldTypes, int[]
projection) {
+ List<Schema.Field> schemaFields = schema.getFields();
+ this.fieldReaders = new FieldReader[schemaFields.size()];
+ for (int i = 0, fieldsSize = schemaFields.size(); i < fieldsSize;
i++) {
+ Schema.Field field = schemaFields.get(i);
+ DataType type = fieldTypes.get(i);
+ fieldReaders[i] = visit(field.schema(), type);
+ }
+ this.projection = projection;
+
+ // use fieldTypes to compatible with less fields in avro
+
+ @SuppressWarnings("unchecked")
+ List<Integer>[] mapping = new List[fieldTypes.size()];
+ for (int i = 0; i < projection.length; i++) {
+ List<Integer> columns = mapping[projection[i]];
+ if (columns == null) {
+ columns = new ArrayList<>();
+ mapping[projection[i]] = columns;
+ }
+ columns.add(i);
+ }
+
+ this.mapping = new int[fieldTypes.size()][];
+ for (int i = 0; i < mapping.length; i++) {
+ List<Integer> fields = mapping[i];
+ if (fields != null) {
+ this.mapping[i] =
fields.stream().mapToInt(Integer::intValue).toArray();
+ }
+ }
+ }
+
+ @Override
+ public InternalRow read(Decoder decoder, Object reuse) throws
IOException {
+ GenericRow row;
+ if (reuse instanceof GenericRow
+ && ((GenericRow) reuse).getFieldCount() ==
projection.length) {
+ row = (GenericRow) reuse;
+ } else {
+ row = new GenericRow(projection.length);
+ }
+
+ for (int i = 0; i < fieldReaders.length; i += 1) {
+ int[] columns = mapping[i];
+ FieldReader reader = fieldReaders[i];
+ if (columns == null) {
+ reader.skip(decoder);
+ } else {
+ Object value = reader.read(decoder,
row.getField(columns[0]));
+ for (int column : columns) {
+ row.setField(column, value);
+ }
+ }
+ }
+ return row;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ for (FieldReader fieldReader : fieldReaders) {
+ fieldReader.skip(decoder);
+ }
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriter.java
new file mode 100644
index 000000000..a790fcd94
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.data.DataGetters;
+
+import org.apache.avro.io.Encoder;
+
+import java.io.IOException;
+
+/** Writer to write field to Avro {@link Encoder}. */
+public interface FieldWriter {
+
+ void write(DataGetters container, int index, Encoder encoder) throws
IOException;
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
new file mode 100644
index 000000000..0bc37894b
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.data.DataGetters;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.util.Utf8;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Factory to create {@link FieldWriter}. */
+public class FieldWriterFactory implements AvroSchemaVisitor<FieldWriter> {
+
+ private static final FieldWriter STRING_WRITER =
+ (container, i, encoder) ->
+ encoder.writeString(new
Utf8(container.getString(i).toBytes()));
+
+ private static final FieldWriter BYTES_WRITER =
+ (container, i, encoder) ->
encoder.writeBytes(container.getBinary(i));
+
+ private static final FieldWriter BOOLEAN_WRITER =
+ (container, i, encoder) ->
encoder.writeBoolean(container.getBoolean(i));
+
+ private static final FieldWriter INT_WRITER =
+ (container, i, encoder) -> encoder.writeInt(container.getInt(i));
+
+ private static final FieldWriter TINYINT_WRITER =
+ (container, i, encoder) -> encoder.writeInt(container.getByte(i));
+
+ private static final FieldWriter SMALLINT_WRITER =
+ (container, i, encoder) -> encoder.writeInt(container.getShort(i));
+
+ private static final FieldWriter BIGINT_WRITER =
+ (container, i, encoder) -> encoder.writeLong(container.getLong(i));
+
+ private static final FieldWriter FLOAT_WRITER =
+ (container, i, encoder) ->
encoder.writeFloat(container.getFloat(i));
+
+ private static final FieldWriter DOUBLE_WRITER =
+ (container, i, encoder) ->
encoder.writeDouble(container.getDouble(i));
+
+ @Override
+ public FieldWriter visitUnion(Schema schema, DataType type) {
+ return new NullableWriter(visit(schema.getTypes().get(1), type));
+ }
+
+ @Override
+ public FieldWriter visitString() {
+ return STRING_WRITER;
+ }
+
+ @Override
+ public FieldWriter visitBytes() {
+ return BYTES_WRITER;
+ }
+
+ @Override
+ public FieldWriter visitInt() {
+ return INT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visitTinyInt() {
+ return TINYINT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visitSmallInt() {
+ return SMALLINT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visitBoolean() {
+ return BOOLEAN_WRITER;
+ }
+
+ @Override
+ public FieldWriter visitBigInt() {
+ return BIGINT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visitFloat() {
+ return FLOAT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visitDouble() {
+ return DOUBLE_WRITER;
+ }
+
+ @Override
+ public FieldWriter visitTimestampMillis(int precision) {
+ return (container, i, encoder) ->
+ encoder.writeLong(container.getTimestamp(i,
precision).getMillisecond());
+ }
+
+ @Override
+ public FieldWriter visitTimestampMicros(int precision) {
+ return (container, i, encoder) ->
+ encoder.writeLong(container.getTimestamp(i,
precision).toMicros());
+ }
+
+ @Override
+ public FieldWriter visitDecimal(int precision, int scale) {
+ return (container, index, encoder) -> {
+ Decimal decimal = container.getDecimal(index, precision, scale);
+ encoder.writeBytes(decimal.toUnscaledBytes());
+ };
+ }
+
+ @Override
+ public FieldWriter visitArray(Schema schema, DataType elementType) {
+ FieldWriter elementWriter = visit(schema.getElementType(),
elementType);
+ return (container, index, encoder) -> {
+ InternalArray array = container.getArray(index);
+ encoder.writeArrayStart();
+ int numElements = array.size();
+ encoder.setItemCount(numElements);
+ for (int i = 0; i < numElements; i += 1) {
+ encoder.startItem();
+ elementWriter.write(array, i, encoder);
+ }
+ encoder.writeArrayEnd();
+ };
+ }
+
+ @Override
+ public FieldWriter visitMap(Schema schema, DataType valueType) {
+ FieldWriter valueWriter = visit(schema.getValueType(), valueType);
+ return (container, index, encoder) -> {
+ InternalMap map = container.getMap(index);
+ encoder.writeMapStart();
+ int numElements = map.size();
+ encoder.setItemCount(numElements);
+ InternalArray keyArray = map.keyArray();
+ InternalArray valueArray = map.valueArray();
+ for (int i = 0; i < numElements; i += 1) {
+ encoder.startItem();
+ STRING_WRITER.write(keyArray, i, encoder);
+ valueWriter.write(valueArray, i, encoder);
+ }
+ encoder.writeMapEnd();
+ };
+ }
+
+ @Override
+ public FieldWriter visitRecord(Schema schema, List<DataType> fieldTypes) {
+ return new RowWriter(schema, fieldTypes);
+ }
+
+ private static class NullableWriter implements FieldWriter {
+
+ private final FieldWriter writer;
+
+ public NullableWriter(FieldWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void write(DataGetters container, int index, Encoder encoder)
throws IOException {
+ if (container.isNullAt(index)) {
+ encoder.writeIndex(0);
+ } else {
+ encoder.writeIndex(1);
+ writer.write(container, index, encoder);
+ }
+ }
+ }
+
+ /** A {@link FieldWriter} to write {@link InternalRow}. */
+ public class RowWriter implements FieldWriter {
+
+ private final FieldWriter[] fieldWriters;
+
+ private RowWriter(Schema schema, List<DataType> fieldTypes) {
+ List<Schema.Field> schemaFields = schema.getFields();
+ this.fieldWriters = new FieldWriter[schemaFields.size()];
+ for (int i = 0, fieldsSize = schemaFields.size(); i < fieldsSize;
i++) {
+ Schema.Field field = schemaFields.get(i);
+ DataType type = fieldTypes.get(i);
+ fieldWriters[i] = visit(field.schema(), type);
+ }
+ }
+
+ @Override
+ public void write(DataGetters container, int index, Encoder encoder)
throws IOException {
+ InternalRow row = container.getRow(index, fieldWriters.length);
+ writeRow(row, encoder);
+ }
+
+ public void writeRow(InternalRow row, Encoder encoder) throws
IOException {
+ for (int i = 0; i < fieldWriters.length; i += 1) {
+ fieldWriters[i].write(row, i, encoder);
+ }
+ }
+ }
+
+ public RowWriter createRowWriter(Schema schema, List<DataType> fieldTypes)
{
+ return new RowWriter(schema, fieldTypes);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
deleted file mode 100644
index 76bcb380c..000000000
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.avro;
-
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.types.ArrayType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.RowType;
-
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.util.Utf8;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static
org.apache.paimon.format.avro.AvroSchemaConverter.extractValueTypeToAvroMap;
-
-/** Tool class used to convert from {@link InternalRow} to Avro {@link
GenericRecord}. */
-public class RowDataToAvroConverters {
-
- //
--------------------------------------------------------------------------------
- // Runtime Converters
- //
--------------------------------------------------------------------------------
-
- /**
- * Runtime converter that converts objects of Paimon internal data
structures to corresponding
- * Avro data structures.
- */
- @FunctionalInterface
- public interface RowDataToAvroConverter extends Serializable {
- Object convert(Schema schema, Object object);
- }
-
- //
--------------------------------------------------------------------------------
- // IMPORTANT! We use anonymous classes instead of lambdas for a reason
here. It is
- // necessary because the maven shade plugin cannot relocate classes in
- // SerializedLambdas (MSHADE-260). On the other hand we want to relocate
Avro for
- // sql-client uber jars.
- //
--------------------------------------------------------------------------------
-
- /**
- * Creates a runtime converter according to the given logical type that
converts objects of
- * Paimon internal data structures to corresponding Avro data structures.
- */
- public static RowDataToAvroConverter createConverter(DataType type) {
- final RowDataToAvroConverter converter;
- switch (type.getTypeRoot()) {
- case TINYINT:
- converter =
- new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object
object) {
- return ((Byte) object).intValue();
- }
- };
- break;
- case SMALLINT:
- converter =
- new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object
object) {
- return ((Short) object).intValue();
- }
- };
- break;
- case BOOLEAN: // boolean
- case INTEGER: // int
- case BIGINT: // long
- case FLOAT: // float
- case DOUBLE: // double
- case TIME_WITHOUT_TIME_ZONE: // int
- case DATE: // int
- converter =
- new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object
object) {
- return object;
- }
- };
- break;
- case CHAR:
- case VARCHAR:
- converter =
- new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object
object) {
- return new Utf8(object.toString());
- }
- };
- break;
- case BINARY:
- case VARBINARY:
- converter =
- new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object
object) {
- return ByteBuffer.wrap((byte[]) object);
- }
- };
- break;
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- converter =
- new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object
object) {
- LogicalType logicalType =
schema.getLogicalType();
-
- if (logicalType instanceof
LogicalTypes.TimestampMillis) {
- return ((Timestamp)
object).toInstant().toEpochMilli();
- } else if (logicalType instanceof
LogicalTypes.TimestampMicros) {
- return ((Timestamp) object).toMicros();
- } else {
- throw new UnsupportedOperationException(
- "Unsupported timestamp type: " +
logicalType);
- }
- }
- };
- break;
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- converter =
- new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object
object) {
- LogicalType logicalType =
schema.getLogicalType();
-
- if (logicalType instanceof
LogicalTypes.LocalTimestampMillis) {
- return ((Timestamp)
object).toInstant().toEpochMilli();
- } else if (logicalType
- instanceof
LogicalTypes.LocalTimestampMicros) {
- return ((Timestamp) object).toMicros();
- } else {
- throw new UnsupportedOperationException(
- "Unsupported timestamp type: " +
logicalType);
- }
- }
- };
- break;
- case DECIMAL:
- converter =
- new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object
object) {
- return ByteBuffer.wrap(((Decimal)
object).toUnscaledBytes());
- }
- };
- break;
- case ARRAY:
- converter = createArrayConverter((ArrayType) type);
- break;
- case ROW:
- converter = createRowConverter((RowType) type);
- break;
- case MAP:
- case MULTISET:
- converter = createMapConverter(type);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported type: " +
type);
- }
-
- // wrap into nullable converter
- return new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object object) {
- if (object == null) {
- return null;
- }
-
- // get actual schema if it is a nullable schema
- Schema actualSchema;
- if (schema.getType() == Schema.Type.UNION) {
- List<Schema> types = schema.getTypes();
- int size = types.size();
- if (size == 2 && types.get(1).getType() ==
Schema.Type.NULL) {
- actualSchema = types.get(0);
- } else if (size == 2 && types.get(0).getType() ==
Schema.Type.NULL) {
- actualSchema = types.get(1);
- } else {
- throw new IllegalArgumentException(
- "The Avro schema is not a nullable type: " +
schema);
- }
- } else {
- actualSchema = schema;
- }
- return converter.convert(actualSchema, object);
- }
- };
- }
-
- private static RowDataToAvroConverter createRowConverter(RowType rowType) {
- final RowDataToAvroConverter[] fieldConverters =
- rowType.getFieldTypes().stream()
- .map(RowDataToAvroConverters::createConverter)
- .toArray(RowDataToAvroConverter[]::new);
- final DataType[] fieldTypes =
-
rowType.getFields().stream().map(DataField::type).toArray(DataType[]::new);
- final InternalRow.FieldGetter[] fieldGetters =
- new InternalRow.FieldGetter[fieldTypes.length];
- for (int i = 0; i < fieldTypes.length; i++) {
- fieldGetters[i] = InternalRow.createFieldGetter(fieldTypes[i], i);
- }
- final int length = rowType.getFieldCount();
-
- return new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object object) {
- final InternalRow row = (InternalRow) object;
- final List<Schema.Field> fields = schema.getFields();
- final GenericRecord record = new GenericData.Record(schema);
- for (int i = 0; i < length; ++i) {
- final Schema.Field schemaField = fields.get(i);
- try {
- Object avroObject =
- fieldConverters[i].convert(
- schemaField.schema(),
fieldGetters[i].getFieldOrNull(row));
- record.put(i, avroObject);
- } catch (Throwable t) {
- throw new RuntimeException(
- String.format(
- "Fail to serialize at field: %s.",
schemaField.name()),
- t);
- }
- }
- return record;
- }
- };
- }
-
- private static RowDataToAvroConverter createArrayConverter(ArrayType
arrayType) {
- DataType elementType = arrayType.getElementType();
- final InternalArray.ElementGetter elementGetter =
- InternalArray.createElementGetter(elementType);
- final RowDataToAvroConverter elementConverter =
createConverter(arrayType.getElementType());
-
- return new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object object) {
- final Schema elementSchema = schema.getElementType();
- InternalArray arrayData = (InternalArray) object;
- List<Object> list = new ArrayList<>();
- for (int i = 0; i < arrayData.size(); ++i) {
- list.add(
- elementConverter.convert(
- elementSchema,
elementGetter.getElementOrNull(arrayData, i)));
- }
- return list;
- }
- };
- }
-
- private static RowDataToAvroConverter createMapConverter(DataType type) {
- DataType valueType = extractValueTypeToAvroMap(type);
- final InternalArray.ElementGetter valueGetter =
- InternalArray.createElementGetter(valueType);
- final RowDataToAvroConverter valueConverter =
createConverter(valueType);
-
- return new RowDataToAvroConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Schema schema, Object object) {
- final Schema valueSchema = schema.getValueType();
- final InternalMap mapData = (InternalMap) object;
- final InternalArray keyArray = mapData.keyArray();
- final InternalArray valueArray = mapData.valueArray();
- final Map<Object, Object> map = new HashMap<>(mapData.size());
- for (int i = 0; i < mapData.size(); ++i) {
- final String key = keyArray.getString(i).toString();
- final Object value =
- valueConverter.convert(
- valueSchema,
valueGetter.getElementOrNull(valueArray, i));
- map.put(key, value);
- }
- return map;
- }
- };
- }
-}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
deleted file mode 100644
index b73cde5cc..000000000
---
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.avro;
-
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.utils.FileIOUtils;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.paimon.format.avro.AvroBulkFormatTestUtils.ROW_TYPE;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link AbstractAvroBulkFormat}. */
-class AvroBulkFormatTest {
-
- private static final long TIMESTAMP = System.currentTimeMillis();
-
- private static final List<InternalRow> TEST_DATA =
- Arrays.asList(
- // -------- batch 0, block start 232 --------
- GenericRow.of(
- BinaryString.fromString("AvroBulk"),
- BinaryString.fromString("FormatTest"),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123)),
- GenericRow.of(
- BinaryString.fromString("Apache"),
- BinaryString.fromString("Paimon"),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123)),
- GenericRow.of(
- BinaryString.fromString(
- "永和九年,岁在癸丑,暮春之初,会于会稽山阴之兰亭,修禊事也。群贤毕至,少"
- + "长咸集。此地有崇山峻岭,茂林修竹,又有清流激湍,映带左右。引"
- + "以为流觞曲水,列坐其次。虽无丝竹管弦之盛,一觞一咏,亦足以畅"
- + "叙幽情。"),
- BinaryString.fromString(""),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123)),
- // -------- batch 1, block start 689 --------
- GenericRow.of(
- BinaryString.fromString("File"),
- BinaryString.fromString("Format"),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123)),
- GenericRow.of(
- null,
- BinaryString.fromString(
- "This is a string with English, 中文 and
even 🍎🍌🍑🥝🍍🥭🍐"),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123)),
- // -------- batch 2, block start 1147 --------
- GenericRow.of(
- BinaryString.fromString("block with"),
- BinaryString.fromString("only one record"),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
- Timestamp.fromEpochMillis(TIMESTAMP),
- Timestamp.fromMicros(TIMESTAMP * 1000 + 123))
- // -------- file length 1323 --------
- );
- private static final List<Long> BLOCK_STARTS = Arrays.asList(689L, 1147L,
1323L);
-
- private File tmpFile;
-
- @BeforeEach
- public void before() throws IOException {
- tmpFile = Files.createTempFile("avro-bulk-format-test",
".avro").toFile();
- tmpFile.createNewFile();
- FileOutputStream out = new FileOutputStream(tmpFile);
-
- Schema schema = AvroSchemaConverter.convertToSchema(ROW_TYPE);
- RowDataToAvroConverters.RowDataToAvroConverter converter =
- RowDataToAvroConverters.createConverter(ROW_TYPE);
-
- DatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<>(schema);
- DataFileWriter<GenericRecord> dataFileWriter = new
DataFileWriter<>(datumWriter);
- dataFileWriter.create(schema, out);
-
- // Generate the sync points manually in order to test blocks.
- long syncBlock1 = dataFileWriter.sync();
- dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(0)));
- dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(1)));
- dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(2)));
- long syncBlock2 = dataFileWriter.sync();
- dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(3)));
- dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(4)));
- long syncBlock3 = dataFileWriter.sync();
- dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(5)));
- long syncEnd = dataFileWriter.sync();
- dataFileWriter.close();
-
- // These values should be constant if nothing else changes with the
file.
- assertThat(BLOCK_STARTS).isEqualTo(Arrays.asList(syncBlock1,
syncBlock2, syncBlock3));
- assertThat(tmpFile).hasSize(syncEnd);
- }
-
- @AfterEach
- public void after() throws IOException {
- FileIOUtils.deleteFileOrDirectory(tmpFile);
- }
-
- @Test
- void testReadWholeFileWithOneSplit() throws IOException {
- AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
- new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
- RecordReader<InternalRow> reader =
- bulkFormat.createReader(new LocalFileIO(), new
Path(tmpFile.toString()));
- AtomicInteger i = new AtomicInteger(0);
- reader.forEachRemaining(
- rowData ->
assertThat(rowData).isEqualTo(TEST_DATA.get(i.getAndIncrement())));
- }
-}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTestUtils.java
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTestUtils.java
deleted file mode 100644
index b68a6379b..000000000
---
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTestUtils.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.avro;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-
-import java.util.function.Function;
-
-/** Testing utils for tests related to {@link AbstractAvroBulkFormat}. */
-public class AvroBulkFormatTestUtils {
-
- public static final RowType ROW_TYPE =
- (RowType)
- RowType.builder()
- .fields(
- new DataType[] {
- DataTypes.STRING(),
- DataTypes.STRING(),
- DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(6),
-
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3),
-
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6),
- },
- new String[] {
- "a",
- "b",
- "timestamp_millis",
- "timestamp_micros",
- "local_timestamp_millis",
- "local_timestamp_micros"
- })
- .build()
- .notNull();
-
- /** {@link AbstractAvroBulkFormat} for tests. */
- public static class TestingAvroBulkFormat extends
AbstractAvroBulkFormat<GenericRecord> {
-
- protected TestingAvroBulkFormat() {
- super(AvroSchemaConverter.convertToSchema(ROW_TYPE));
- }
-
- @Override
- protected GenericRecord createReusedAvroRecord() {
- return new GenericData.Record(readerSchema);
- }
-
- @Override
- protected Function<GenericRecord, InternalRow> createConverter() {
- AvroToRowDataConverters.AvroToRowDataConverter converter =
- AvroToRowDataConverters.createRowConverter(ROW_TYPE);
- return record -> record == null ? null : (InternalRow)
converter.convert(record);
- }
- }
-}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
new file mode 100644
index 000000000..6202b7a7a
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.options.Options;
+
+/** An avro {@link FormatReadWriteTest}. */
+public class AvroFormatReadWriteTest extends FormatReadWriteTest {
+
+ @Override
+ protected FileFormat fileFormat() {
+ return new AvroFileFormat(new Options());
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroToRowDataConvertersTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroToRowDataConvertersTest.java
deleted file mode 100644
index 31cac15a3..000000000
---
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroToRowDataConvertersTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.avro;
-
-import org.apache.paimon.data.Timestamp;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.time.Instant;
-
-/** Test for avro to row data converters. */
-public class AvroToRowDataConvertersTest {
- private static final Timestamp NOW = Timestamp.now();
- private static final long TS_MILLIS = NOW.getMillisecond();
- private static final long TS_MICROS = NOW.toMicros() + 123L;
- private static final Timestamp NOW_MICROS =
Timestamp.fromMicros(TS_MICROS);
- private static final Instant INSTANT = Instant.ofEpochMilli(TS_MILLIS);
-
- @Test
- public void testConvertToTimestamp() {
- Assertions.assertEquals(NOW,
AvroToRowDataConverters.convertToTimestamp(TS_MILLIS, 3));
-
- Assertions.assertEquals(
- NOW_MICROS,
AvroToRowDataConverters.convertToTimestamp(TS_MICROS, 6));
-
- Assertions.assertEquals(NOW,
AvroToRowDataConverters.convertToTimestamp(INSTANT, 3));
-
- Assertions.assertEquals(NOW,
AvroToRowDataConverters.convertToTimestamp(INSTANT, 6));
- }
-}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/avro/RowDataToAvroConvertersTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/avro/RowDataToAvroConvertersTest.java
deleted file mode 100644
index 9b38176aa..000000000
---
a/paimon-format/src/test/java/org/apache/paimon/format/avro/RowDataToAvroConvertersTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.avro;
-
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.LocalZonedTimestampType;
-
-import org.apache.avro.Schema;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-
-/** Test for row data to avro converters. */
-public class RowDataToAvroConvertersTest {
-
- @Test
- public void testTimestampWithTimeType() {
- LocalZonedTimestampType zonedTimestampType =
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3);
- RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter =
- RowDataToAvroConverters.createConverter(zonedTimestampType);
-
- OffsetDateTime offsetDateTime =
- OffsetDateTime.of(LocalDate.of(2023, 1, 1), LocalTime.of(0, 0,
0), ZoneOffset.UTC);
-
- Schema schema =
AvroSchemaConverter.convertToSchema(zonedTimestampType);
- long timestamp = offsetDateTime.toInstant().toEpochMilli();
- Timestamp millis = Timestamp.fromEpochMillis(timestamp);
- Object converted = rowDataToAvroConverter.convert(schema, millis);
-
- AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
- AvroToRowDataConverters.createConverter(zonedTimestampType);
-
- Object original = avroToRowDataConverter.convert(converted);
- Assertions.assertEquals(millis, original);
- }
-}