This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c4cb42a1 PARQUET-1822: Avoid requiring Hadoop installation for 
reading/writing (#1111)
7c4cb42a1 is described below

commit 7c4cb42a1c6f4fb2e4a00f208bc9d073e5f7a340
Author: Atour <[email protected]>
AuthorDate: Tue Jul 4 04:07:00 2023 +0200

    PARQUET-1822: Avoid requiring Hadoop installation for reading/writing 
(#1111)
---
 .../org/apache/parquet/avro/TestReadWrite.java     |  72 +++++++++-----
 .../java/org/apache/parquet/io/LocalInputFile.java | 102 ++++++++++++++++++++
 .../org/apache/parquet/io/LocalOutputFile.java     | 107 +++++++++++++++++++++
 .../java/org/apache/parquet/io/OutputFile.java     |  25 +++++
 .../apache/parquet/io/TestLocalInputOutput.java    |  92 ++++++++++++++++++
 5 files changed, 374 insertions(+), 24 deletions(-)

diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 6484ab4a6..81e751aba 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -27,6 +27,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
@@ -54,6 +55,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.LocalInputFile;
+import org.apache.parquet.io.LocalOutputFile;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageTypeParser;
@@ -74,16 +77,19 @@ public class TestReadWrite {
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
     Object[][] data = new Object[][] {
-        { false },  // use the new converters
-        { true } }; // use the old converters
+        { false, false },  // use the new converters
+        { true, false },   // use the old converters
+        { false, true } }; // use a local disk location
     return Arrays.asList(data);
   }
 
   private final boolean compat;
+  private final boolean local;
   private final Configuration testConf = new Configuration();
 
-  public TestReadWrite(boolean compat) {
+  public TestReadWrite(boolean compat, boolean local) {
     this.compat = compat;
+    this.local = local;
     this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
     testConf.setBoolean("parquet.avro.add-list-element-records", false);
     testConf.setBoolean("parquet.avro.write-old-list-structure", false);
@@ -92,24 +98,20 @@ public class TestReadWrite {
   @Test
   public void testEmptyArray() throws Exception {
     Schema schema = new Schema.Parser().parse(
-        Resources.getResource("array.avsc").openStream());
+      Resources.getResource("array.avsc").openStream());
 
     // Write a record with an empty array.
     List<Integer> emptyArray = new ArrayList<>();
 
-    Path file = new Path(createTempFile().getPath());
+    String file = createTempFile().getPath();
 
-    try(ParquetWriter<GenericRecord> writer = AvroParquetWriter
-        .<GenericRecord>builder(file)
-        .withSchema(schema)
-        .withConf(testConf)
-        .build()) {
+    try(ParquetWriter<GenericRecord> writer = writer(file, schema)) {
       GenericData.Record record = new GenericRecordBuilder(schema)
         .set("myarray", emptyArray).build();
       writer.write(record);
     }
 
-    try (AvroParquetReader<GenericRecord> reader = new 
AvroParquetReader<>(testConf, file)) {
+    try (ParquetReader<GenericRecord> reader = reader(file)) {
       GenericRecord nextRecord = reader.read();
 
       assertNotNull(nextRecord);
@@ -120,16 +122,12 @@ public class TestReadWrite {
   @Test
   public void testEmptyMap() throws Exception {
     Schema schema = new Schema.Parser().parse(
-        Resources.getResource("map.avsc").openStream());
+      Resources.getResource("map.avsc").openStream());
 
-    Path file = new Path(createTempFile().getPath());
+    String file = createTempFile().getPath();
     ImmutableMap<String, Integer> emptyMap = new ImmutableMap.Builder<String, 
Integer>().build();
 
-    try(ParquetWriter<GenericRecord> writer = AvroParquetWriter
-        .<GenericRecord>builder(file)
-        .withSchema(schema)
-        .withConf(testConf)
-        .build()) {
+    try (ParquetWriter<GenericRecord> writer = writer(file, schema)) {
 
       // Write a record with an empty map.
       GenericData.Record record = new GenericRecordBuilder(schema)
@@ -137,7 +135,7 @@ public class TestReadWrite {
       writer.write(record);
     }
 
-    try(AvroParquetReader<GenericRecord> reader = new 
AvroParquetReader<GenericRecord>(testConf, file)) {
+    try(ParquetReader<GenericRecord> reader = reader(file)) {
       GenericRecord nextRecord = reader.read();
 
       assertNotNull(nextRecord);
@@ -704,12 +702,10 @@ public class TestReadWrite {
   public void testNestedLists() throws Exception {
     Schema schema = new Schema.Parser().parse(
       Resources.getResource("nested_array.avsc").openStream());
-    Path file = new Path(createTempFile().getPath());
+    String file = createTempFile().getPath();
 
     // Parquet writer
-    ParquetWriter parquetWriter = 
AvroParquetWriter.builder(file).withSchema(schema)
-      .withConf(testConf)
-      .build();
+    ParquetWriter parquetWriter = writer(file, schema);
 
     Schema innerRecordSchema = schema.getField("l1").schema().getTypes()
       .get(1).getElementType().getTypes().get(1);
@@ -723,7 +719,7 @@ public class TestReadWrite {
     parquetWriter.write(record);
     parquetWriter.close();
 
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader(testConf, 
file);
+    ParquetReader<GenericRecord> reader = reader(file);
     GenericRecord nextRecord = reader.read();
 
     assertNotNull(nextRecord);
@@ -867,6 +863,34 @@ public class TestReadWrite {
     return tmp;
   }
 
+  private ParquetWriter<GenericRecord> writer(String file, Schema schema) 
throws IOException {
+    if (local) {
+      return AvroParquetWriter
+        .<GenericRecord>builder(new LocalOutputFile(Paths.get(file)))
+        .withSchema(schema)
+        .withConf(testConf)
+        .build();
+    } else {
+      return AvroParquetWriter
+        .<GenericRecord>builder(new Path(file))
+        .withSchema(schema)
+        .withConf(testConf)
+        .build();
+    }
+  }
+
+  private ParquetReader<GenericRecord> reader(String file) throws IOException {
+    if (local) {
+      return AvroParquetReader
+        .<GenericRecord>builder(new LocalInputFile(Paths.get(file)))
+        .withDataModel(GenericData.get())
+        .withConf(testConf)
+        .build();
+    } else {
+      return new AvroParquetReader(testConf, new Path(file));
+    }
+  }
+
   /**
    * Return a String or Utf8 depending on whether compatibility is on
    */
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java 
b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java
new file mode 100644
index 000000000..7174b42d5
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+
+/**
+ * {@code LocalInputFile} is an implementation needed by Parquet to read
+ * from local data files using {@link SeekableInputStream} instances.
+ */
+public class LocalInputFile implements InputFile {
+
+  private final Path path;
+  private long length = -1;
+
+  public LocalInputFile(Path file) {
+    path = file;
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    if (length == -1) {
+      try (RandomAccessFile file = new RandomAccessFile(path.toFile(), "r")) {
+        length = file.length();
+      }
+    }
+    return length;
+  }
+
+  @Override
+  public SeekableInputStream newStream() throws IOException {
+
+    return new SeekableInputStream() {
+
+      private final RandomAccessFile randomAccessFile = new 
RandomAccessFile(path.toFile(), "r");
+
+      @Override
+      public int read() throws IOException {
+        return randomAccessFile.read();
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        return randomAccessFile.getFilePointer();
+      }
+
+      @Override
+      public void seek(long newPos) throws IOException {
+        randomAccessFile.seek(newPos);
+      }
+
+      @Override
+      public void readFully(byte[] bytes) throws IOException {
+        randomAccessFile.readFully(bytes);
+      }
+
+      @Override
+      public void readFully(byte[] bytes, int start, int len) throws 
IOException {
+        randomAccessFile.readFully(bytes, start, len);
+      }
+
+      @Override
+      public int read(ByteBuffer buf) throws IOException {
+        byte[] buffer = new byte[buf.remaining()];
+        int code = read(buffer);
+        buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining());
+        return code;
+      }
+
+      @Override
+      public void readFully(ByteBuffer buf) throws IOException {
+        byte[] buffer = new byte[buf.remaining()];
+        readFully(buffer);
+        buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining());
+      }
+
+      @Override
+      public void close() throws IOException {
+        randomAccessFile.close();
+      }
+    };
+  }
+}
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java 
b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
new file mode 100644
index 000000000..5925df988
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@code LocalOutputFile} is an implementation needed by Parquet to write
+ * to local data files using {@link PositionOutputStream} instances.
+ */
+public class LocalOutputFile implements OutputFile {
+
+  private class LocalPositionOutputStream extends PositionOutputStream {
+    
+    private final BufferedOutputStream stream;
+    private long pos = 0;
+
+    public LocalPositionOutputStream(int buffer, StandardOpenOption... 
openOption) throws IOException {
+      stream = new BufferedOutputStream(Files.newOutputStream(path, 
openOption), buffer);
+    }
+
+    @Override
+    public long getPos() {
+      return pos;
+    }
+
+    @Override
+    public void write(int data) throws IOException {
+      pos++;
+      stream.write(data);
+    }
+
+    @Override
+    public void write(byte[] data) throws IOException {
+      pos += data.length;
+      stream.write(data);
+    }
+
+    @Override
+    public void write(byte[] data, int off, int len) throws IOException {
+      pos += len;
+      stream.write(data, off, len);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      stream.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      stream.close();
+    }
+  }
+
+  private final Path path;
+
+  public LocalOutputFile(Path file) {
+    path = file;
+  }
+
+  @Override
+  public PositionOutputStream create(long buffer) throws IOException {
+    return new LocalPositionOutputStream((int) buffer, 
StandardOpenOption.CREATE_NEW);
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite(long buffer) throws 
IOException {
+    return new LocalPositionOutputStream((int) buffer, 
StandardOpenOption.CREATE,
+      StandardOpenOption.TRUNCATE_EXISTING);
+  }
+
+  @Override
+  public boolean supportsBlockSize() {
+    return true;
+  }
+
+  @Override
+  public long defaultBlockSize() {
+    return 512;
+  }
+
+  @Override
+  public String getPath() {
+    return path.toString();
+  }
+}
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java 
b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
index 77331758b..fede91361 100644
--- a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
+++ b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
@@ -21,16 +21,41 @@ package org.apache.parquet.io;
 
 import java.io.IOException;
 
+/**
+ * {@code OutputFile} is an interface with the methods needed by Parquet to 
write
+ * data files using {@link PositionOutputStream} instances.
+ */
 public interface OutputFile {
 
+  /**
+   * Opens a new {@link PositionOutputStream} for the data file to create.
+   *
+   * @return a new {@link PositionOutputStream} to write the file
+   * @throws IOException if the stream cannot be opened
+   */
   PositionOutputStream create(long blockSizeHint) throws IOException;
 
+  /**
+   * Opens a new {@link PositionOutputStream} for the data file to create or 
overwrite.
+   *
+   * @return a new {@link PositionOutputStream} to write the file
+   * @throws IOException if the stream cannot be opened
+   */
   PositionOutputStream createOrOverwrite(long blockSizeHint) throws 
IOException;
 
+  /**
+   * @return a flag indicating if block size is supported.
+   */
   boolean supportsBlockSize();
 
+  /**
+   * @return the default block size.
+   */
   long defaultBlockSize();
 
+  /**
+   * @return the path of the file, as a {@link String}.
+   */
   default String getPath() {
     return null;
   }
diff --git 
a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java 
b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java
new file mode 100644
index 000000000..d13cc7f71
--- /dev/null
+++ 
b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java
@@ -0,0 +1,92 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.parquet.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class TestLocalInputOutput {
+
+  @Test
+  public void outputFileOverwritesFile() throws IOException {
+    Path path = Paths.get(createTempFile().getPath());
+    OutputFile write = new LocalOutputFile(path);
+    try (PositionOutputStream stream = write.createOrOverwrite(512)) {
+      stream.write(124);
+    }
+    try (PositionOutputStream stream = write.createOrOverwrite(512)) {
+      stream.write(124);
+    }
+    InputFile read = new LocalInputFile(path);
+    try (SeekableInputStream stream = read.newStream()) {
+      assertEquals(stream.read(), 124);
+      assertEquals(stream.read(), -1);
+    }
+  }
+
+  @Test
+  public void outputFileCreateFailsAsFileAlreadyExists() throws IOException {
+    Path path = Paths.get(createTempFile().getPath());
+    OutputFile write = new LocalOutputFile(path);
+    write.create(512).close();
+    assertThrows(FileAlreadyExistsException.class, () -> 
write.create(512).close());
+  }
+
+  @Test
+  public void outputFileCreatesFileWithOverwrite() throws IOException {
+    Path path = Paths.get(createTempFile().getPath());
+    OutputFile write = new LocalOutputFile(path);
+    try (PositionOutputStream stream = write.createOrOverwrite(512)) {
+      stream.write(255);
+    }
+    InputFile read = new LocalInputFile(path);
+    try (SeekableInputStream stream = read.newStream()) {
+      assertEquals(stream.read(), 255);
+      assertEquals(stream.read(), -1);
+    }
+  }
+
+  @Test
+  public void outputFileCreatesFile() throws IOException {
+    Path path = Paths.get(createTempFile().getPath());
+    OutputFile write = new LocalOutputFile(path);
+    try (PositionOutputStream stream = write.createOrOverwrite(512)) {
+      stream.write(2);
+    }
+    InputFile read = new LocalInputFile(path);
+    try (SeekableInputStream stream = read.newStream()) {
+      assertEquals(stream.read(), 2);
+      assertEquals(stream.read(), -1);
+    }
+  }
+
+  private File createTempFile() throws IOException {
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    return tmp;
+  }
+}

Reply via email to