This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 7c4cb42a1 PARQUET-1822: Avoid requiring Hadoop installation for
reading/writing (#1111)
7c4cb42a1 is described below
commit 7c4cb42a1c6f4fb2e4a00f208bc9d073e5f7a340
Author: Atour <[email protected]>
AuthorDate: Tue Jul 4 04:07:00 2023 +0200
PARQUET-1822: Avoid requiring Hadoop installation for reading/writing
(#1111)
---
.../org/apache/parquet/avro/TestReadWrite.java | 72 +++++++++-----
.../java/org/apache/parquet/io/LocalInputFile.java | 102 ++++++++++++++++++++
.../org/apache/parquet/io/LocalOutputFile.java | 107 +++++++++++++++++++++
.../java/org/apache/parquet/io/OutputFile.java | 25 +++++
.../apache/parquet/io/TestLocalInputOutput.java | 92 ++++++++++++++++++
5 files changed, 374 insertions(+), 24 deletions(-)
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 6484ab4a6..81e751aba 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -27,6 +27,7 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
@@ -54,6 +55,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.LocalInputFile;
+import org.apache.parquet.io.LocalOutputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageTypeParser;
@@ -74,16 +77,19 @@ public class TestReadWrite {
@Parameterized.Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
- { false }, // use the new converters
- { true } }; // use the old converters
+ { false, false }, // use the new converters
+ { true, false }, // use the old converters
+ { false, true } }; // use a local disk location
return Arrays.asList(data);
}
private final boolean compat;
+ private final boolean local;
private final Configuration testConf = new Configuration();
- public TestReadWrite(boolean compat) {
+ public TestReadWrite(boolean compat, boolean local) {
this.compat = compat;
+ this.local = local;
this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
testConf.setBoolean("parquet.avro.add-list-element-records", false);
testConf.setBoolean("parquet.avro.write-old-list-structure", false);
@@ -92,24 +98,20 @@ public class TestReadWrite {
@Test
public void testEmptyArray() throws Exception {
Schema schema = new Schema.Parser().parse(
- Resources.getResource("array.avsc").openStream());
+ Resources.getResource("array.avsc").openStream());
// Write a record with an empty array.
List<Integer> emptyArray = new ArrayList<>();
- Path file = new Path(createTempFile().getPath());
+ String file = createTempFile().getPath();
- try(ParquetWriter<GenericRecord> writer = AvroParquetWriter
- .<GenericRecord>builder(file)
- .withSchema(schema)
- .withConf(testConf)
- .build()) {
+ try(ParquetWriter<GenericRecord> writer = writer(file, schema)) {
GenericData.Record record = new GenericRecordBuilder(schema)
.set("myarray", emptyArray).build();
writer.write(record);
}
- try (AvroParquetReader<GenericRecord> reader = new
AvroParquetReader<>(testConf, file)) {
+ try (ParquetReader<GenericRecord> reader = reader(file)) {
GenericRecord nextRecord = reader.read();
assertNotNull(nextRecord);
@@ -120,16 +122,12 @@ public class TestReadWrite {
@Test
public void testEmptyMap() throws Exception {
Schema schema = new Schema.Parser().parse(
- Resources.getResource("map.avsc").openStream());
+ Resources.getResource("map.avsc").openStream());
- Path file = new Path(createTempFile().getPath());
+ String file = createTempFile().getPath();
ImmutableMap<String, Integer> emptyMap = new ImmutableMap.Builder<String,
Integer>().build();
- try(ParquetWriter<GenericRecord> writer = AvroParquetWriter
- .<GenericRecord>builder(file)
- .withSchema(schema)
- .withConf(testConf)
- .build()) {
+ try (ParquetWriter<GenericRecord> writer = writer(file, schema)) {
// Write a record with an empty map.
GenericData.Record record = new GenericRecordBuilder(schema)
@@ -137,7 +135,7 @@ public class TestReadWrite {
writer.write(record);
}
- try(AvroParquetReader<GenericRecord> reader = new
AvroParquetReader<GenericRecord>(testConf, file)) {
+ try(ParquetReader<GenericRecord> reader = reader(file)) {
GenericRecord nextRecord = reader.read();
assertNotNull(nextRecord);
@@ -704,12 +702,10 @@ public class TestReadWrite {
public void testNestedLists() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("nested_array.avsc").openStream());
- Path file = new Path(createTempFile().getPath());
+ String file = createTempFile().getPath();
// Parquet writer
- ParquetWriter parquetWriter =
AvroParquetWriter.builder(file).withSchema(schema)
- .withConf(testConf)
- .build();
+ ParquetWriter parquetWriter = writer(file, schema);
Schema innerRecordSchema = schema.getField("l1").schema().getTypes()
.get(1).getElementType().getTypes().get(1);
@@ -723,7 +719,7 @@ public class TestReadWrite {
parquetWriter.write(record);
parquetWriter.close();
- AvroParquetReader<GenericRecord> reader = new AvroParquetReader(testConf,
file);
+ ParquetReader<GenericRecord> reader = reader(file);
GenericRecord nextRecord = reader.read();
assertNotNull(nextRecord);
@@ -867,6 +863,34 @@ public class TestReadWrite {
return tmp;
}
+ private ParquetWriter<GenericRecord> writer(String file, Schema schema)
throws IOException {
+ if (local) {
+ return AvroParquetWriter
+ .<GenericRecord>builder(new LocalOutputFile(Paths.get(file)))
+ .withSchema(schema)
+ .withConf(testConf)
+ .build();
+ } else {
+ return AvroParquetWriter
+ .<GenericRecord>builder(new Path(file))
+ .withSchema(schema)
+ .withConf(testConf)
+ .build();
+ }
+ }
+
+ private ParquetReader<GenericRecord> reader(String file) throws IOException {
+ if (local) {
+ return AvroParquetReader
+ .<GenericRecord>builder(new LocalInputFile(Paths.get(file)))
+ .withDataModel(GenericData.get())
+ .withConf(testConf)
+ .build();
+ } else {
+ return new AvroParquetReader(testConf, new Path(file));
+ }
+ }
+
/**
* Return a String or Utf8 depending on whether compatibility is on
*/
diff --git
a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java
b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java
new file mode 100644
index 000000000..7174b42d5
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+
+/**
+ * {@code LocalInputFile} is an implementation needed by Parquet to read
+ * from local data files using {@link SeekableInputStream} instances.
+ */
+public class LocalInputFile implements InputFile {
+
+ private final Path path;
+ private long length = -1;
+
+ public LocalInputFile(Path file) {
+ path = file;
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ if (length == -1) {
+ try (RandomAccessFile file = new RandomAccessFile(path.toFile(), "r")) {
+ length = file.length();
+ }
+ }
+ return length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() throws IOException {
+
+ return new SeekableInputStream() {
+
+ private final RandomAccessFile randomAccessFile = new
RandomAccessFile(path.toFile(), "r");
+
+ @Override
+ public int read() throws IOException {
+ return randomAccessFile.read();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return randomAccessFile.getFilePointer();
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ randomAccessFile.seek(newPos);
+ }
+
+ @Override
+ public void readFully(byte[] bytes) throws IOException {
+ randomAccessFile.readFully(bytes);
+ }
+
+ @Override
+ public void readFully(byte[] bytes, int start, int len) throws
IOException {
+ randomAccessFile.readFully(bytes, start, len);
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ byte[] buffer = new byte[buf.remaining()];
+ int code = read(buffer);
+ buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining());
+ return code;
+ }
+
+ @Override
+ public void readFully(ByteBuffer buf) throws IOException {
+ byte[] buffer = new byte[buf.remaining()];
+ readFully(buffer);
+ buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining());
+ }
+
+ @Override
+ public void close() throws IOException {
+ randomAccessFile.close();
+ }
+ };
+ }
+}
diff --git
a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
new file mode 100644
index 000000000..5925df988
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@code LocalOutputFile} is an implementation needed by Parquet to write
+ * to local data files using {@link PositionOutputStream} instances.
+ */
+public class LocalOutputFile implements OutputFile {
+
+ private class LocalPositionOutputStream extends PositionOutputStream {
+
+ private final BufferedOutputStream stream;
+ private long pos = 0;
+
+ public LocalPositionOutputStream(int buffer, StandardOpenOption...
openOption) throws IOException {
+ stream = new BufferedOutputStream(Files.newOutputStream(path,
openOption), buffer);
+ }
+
+ @Override
+ public long getPos() {
+ return pos;
+ }
+
+ @Override
+ public void write(int data) throws IOException {
+ pos++;
+ stream.write(data);
+ }
+
+ @Override
+ public void write(byte[] data) throws IOException {
+ pos += data.length;
+ stream.write(data);
+ }
+
+ @Override
+ public void write(byte[] data, int off, int len) throws IOException {
+ pos += len;
+ stream.write(data, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ }
+ }
+
+ private final Path path;
+
+ public LocalOutputFile(Path file) {
+ path = file;
+ }
+
+ @Override
+ public PositionOutputStream create(long buffer) throws IOException {
+ return new LocalPositionOutputStream((int) buffer,
StandardOpenOption.CREATE_NEW);
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite(long buffer) throws
IOException {
+ return new LocalPositionOutputStream((int) buffer,
StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING);
+ }
+
+ @Override
+ public boolean supportsBlockSize() {
+ return true;
+ }
+
+ @Override
+ public long defaultBlockSize() {
+ return 512;
+ }
+
+ @Override
+ public String getPath() {
+ return path.toString();
+ }
+}
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
index 77331758b..fede91361 100644
--- a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
+++ b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
@@ -21,16 +21,41 @@ package org.apache.parquet.io;
import java.io.IOException;
+/**
+ * {@code OutputFile} is an interface with the methods needed by Parquet to
write
+ * data files using {@link PositionOutputStream} instances.
+ */
public interface OutputFile {
+ /**
+ * Opens a new {@link PositionOutputStream} for the data file to create.
+ *
+ * @return a new {@link PositionOutputStream} to write the file
+ * @throws IOException if the stream cannot be opened
+ */
PositionOutputStream create(long blockSizeHint) throws IOException;
+ /**
+ * Opens a new {@link PositionOutputStream} for the data file to create or
overwrite.
+ *
+ * @return a new {@link PositionOutputStream} to write the file
+ * @throws IOException if the stream cannot be opened
+ */
PositionOutputStream createOrOverwrite(long blockSizeHint) throws
IOException;
+ /**
+ * @return a flag indicating if block size is supported.
+ */
boolean supportsBlockSize();
+ /**
+ * @return the default block size.
+ */
long defaultBlockSize();
+ /**
+ * @return the path of the file, as a {@link String}.
+ */
default String getPath() {
return null;
}
diff --git
a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java
b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java
new file mode 100644
index 000000000..d13cc7f71
--- /dev/null
+++
b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class TestLocalInputOutput {
+
+ @Test
+ public void outputFileOverwritesFile() throws IOException {
+ Path path = Paths.get(createTempFile().getPath());
+ OutputFile write = new LocalOutputFile(path);
+ try (PositionOutputStream stream = write.createOrOverwrite(512)) {
+ stream.write(124);
+ }
+ try (PositionOutputStream stream = write.createOrOverwrite(512)) {
+ stream.write(124);
+ }
+ InputFile read = new LocalInputFile(path);
+ try (SeekableInputStream stream = read.newStream()) {
+ assertEquals(stream.read(), 124);
+ assertEquals(stream.read(), -1);
+ }
+ }
+
+ @Test
+ public void outputFileCreateFailsAsFileAlreadyExists() throws IOException {
+ Path path = Paths.get(createTempFile().getPath());
+ OutputFile write = new LocalOutputFile(path);
+ write.create(512).close();
+ assertThrows(FileAlreadyExistsException.class, () ->
write.create(512).close());
+ }
+
+ @Test
+ public void outputFileCreatesFileWithOverwrite() throws IOException {
+ Path path = Paths.get(createTempFile().getPath());
+ OutputFile write = new LocalOutputFile(path);
+ try (PositionOutputStream stream = write.createOrOverwrite(512)) {
+ stream.write(255);
+ }
+ InputFile read = new LocalInputFile(path);
+ try (SeekableInputStream stream = read.newStream()) {
+ assertEquals(stream.read(), 255);
+ assertEquals(stream.read(), -1);
+ }
+ }
+
+ @Test
+ public void outputFileCreatesFile() throws IOException {
+ Path path = Paths.get(createTempFile().getPath());
+ OutputFile write = new LocalOutputFile(path);
+ try (PositionOutputStream stream = write.createOrOverwrite(512)) {
+ stream.write(2);
+ }
+ InputFile read = new LocalInputFile(path);
+ try (SeekableInputStream stream = read.newStream()) {
+ assertEquals(stream.read(), 2);
+ assertEquals(stream.read(), -1);
+ }
+ }
+
+ private File createTempFile() throws IOException {
+ File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+ tmp.deleteOnExit();
+ tmp.delete();
+ return tmp;
+ }
+}