This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ed54952b5 Core: Use memory-backed streams in tests (#4534)
ed54952b5 is described below

commit ed54952b51a6cdc7d2350b8223c276421845a8e2
Author: Piotr Findeisen <[email protected]>
AuthorDate: Thu Apr 21 21:51:39 2022 +0200

    Core: Use memory-backed streams in tests (#4534)
    
    Sometimes an actual file is needed, but sometimes it's not. In the case
    where it isn't, we can use an in-memory `InputFile`, `OutputFile`
    implementation.
---
 .../apache/iceberg/TestManifestListVersions.java   |   3 +-
 .../apache/iceberg/TestManifestWriterVersions.java |   3 +-
 .../iceberg/TestScansAndSchemaEvolution.java       |  15 +--
 .../apache/iceberg/avro/TestAvroDeleteWriters.java |   9 +-
 .../org/apache/iceberg/avro/TestGenericAvro.java   |  13 +-
 .../org/apache/iceberg/io/InMemoryInputFile.java   | 131 +++++++++++++++++++++
 .../org/apache/iceberg/io/InMemoryOutputFile.java  | 112 ++++++++++++++++++
 7 files changed, 263 insertions(+), 23 deletions(-)

diff --git 
a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java 
b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
index 390ba0de1..8cc64f078 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InMemoryOutputFile;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -251,7 +252,7 @@ public class TestManifestListVersions {
   }
 
   private InputFile writeManifestList(ManifestFile manifest, int 
formatVersion) throws IOException {
-    OutputFile manifestList = Files.localOutput(temp.newFile());
+    OutputFile manifestList = new InMemoryOutputFile();
     try (FileAppender<ManifestFile> writer = ManifestLists.write(
         formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1, 
formatVersion > 1 ? SEQ_NUM : 0)) {
       writer.add(manifest);
diff --git 
a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java 
b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index c9230315e..79aa35db9 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InMemoryOutputFile;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -235,7 +236,7 @@ public class TestManifestWriterVersions {
   }
 
   private InputFile writeManifestList(ManifestFile manifest, int 
formatVersion) throws IOException {
-    OutputFile manifestList = Files.localOutput(temp.newFile());
+    OutputFile manifestList = new InMemoryOutputFile();
     try (FileAppender<ManifestFile> writer = ManifestLists.write(
         formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1, 
formatVersion > 1 ? SEQUENCE_NUMBER : 0)) {
       writer.add(manifest);
diff --git 
a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java 
b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
index 0ae594401..5a60ece7b 100644
--- a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
+++ b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
@@ -28,6 +28,8 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.RandomAvroData;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InMemoryOutputFile;
+import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.junit.After;
@@ -65,11 +67,11 @@ public class TestScansAndSchemaEvolution {
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
-  private DataFile createDataFile(File dataPath, String partValue) throws 
IOException {
+  private DataFile createDataFile(String partValue) throws IOException {
     List<GenericData.Record> expected = RandomAvroData.generate(SCHEMA, 100, 
0L);
 
-    File dataFile = new File(dataPath, 
FileFormat.AVRO.addExtension(UUID.randomUUID().toString()));
-    try (FileAppender<GenericData.Record> writer = 
Avro.write(Files.localOutput(dataFile))
+    OutputFile dataFile = new 
InMemoryOutputFile(FileFormat.AVRO.addExtension(UUID.randomUUID().toString()));
+    try (FileAppender<GenericData.Record> writer = Avro.write(dataFile)
         .schema(SCHEMA)
         .named("test")
         .build()) {
@@ -82,7 +84,7 @@ public class TestScansAndSchemaEvolution {
     PartitionData partition = new PartitionData(SPEC.partitionType());
     partition.set(0, partValue);
     return DataFiles.builder(SPEC)
-        .withInputFile(Files.localInput(dataFile))
+        .withInputFile(dataFile.toInputFile())
         .withPartition(partition)
         .withRecordCount(100)
         .build();
@@ -96,13 +98,12 @@ public class TestScansAndSchemaEvolution {
   @Test
   public void testPartitionSourceRename() throws IOException {
     File location = temp.newFolder();
-    File dataLocation = new File(location, "data");
     Assert.assertTrue(location.delete()); // should be created by table create
 
     Table table = TestTables.create(location, "test", SCHEMA, SPEC, 
formatVersion);
 
-    DataFile fileOne = createDataFile(dataLocation, "one");
-    DataFile fileTwo = createDataFile(dataLocation, "two");
+    DataFile fileOne = createDataFile("one");
+    DataFile fileTwo = createDataFile("two");
 
     table.newAppend()
         .appendFile(fileOne)
diff --git 
a/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java 
b/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
index dc6113922..d772f54dd 100644
--- a/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
+++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.data.avro.DataReader;
 import org.apache.iceberg.data.avro.DataWriter;
 import org.apache.iceberg.deletes.EqualityDeleteWriter;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.InMemoryOutputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -73,9 +74,7 @@ public class TestAvroDeleteWriters {
 
   @Test
   public void testEqualityDeleteWriter() throws IOException {
-    File deleteFile = temp.newFile();
-
-    OutputFile out = Files.localOutput(deleteFile);
+    OutputFile out = new InMemoryOutputFile();
     EqualityDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
         .createWriterFunc(DataWriter::create)
         .overwrite()
@@ -108,8 +107,6 @@ public class TestAvroDeleteWriters {
 
   @Test
   public void testPositionDeleteWriter() throws IOException {
-    File deleteFile = temp.newFile();
-
     Schema deleteSchema = new Schema(
         MetadataColumns.DELETE_FILE_PATH,
         MetadataColumns.DELETE_FILE_POS,
@@ -119,7 +116,7 @@ public class TestAvroDeleteWriters {
     GenericRecord posDelete = GenericRecord.create(deleteSchema);
     List<Record> expectedDeleteRecords = Lists.newArrayList();
 
-    OutputFile out = Files.localOutput(deleteFile);
+    OutputFile out = new InMemoryOutputFile();
     PositionDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
         .createWriterFunc(DataWriter::create)
         .overwrite()
diff --git a/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java 
b/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java
index 6849657dd..f88c89127 100644
--- a/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java
+++ b/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java
@@ -19,25 +19,22 @@
 
 package org.apache.iceberg.avro;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.avro.generic.GenericData.Record;
-import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InMemoryOutputFile;
+import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.junit.Assert;
 
 public class TestGenericAvro extends AvroDataTest {
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
     List<Record> expected = RandomAvroData.generate(schema, 100, 0L);
 
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
-
-    try (FileAppender<Record> writer = Avro.write(Files.localOutput(testFile))
+    OutputFile outputFile = new InMemoryOutputFile();
+    try (FileAppender<Record> writer = Avro.write(outputFile)
         .schema(schema)
         .named("test")
         .build()) {
@@ -47,7 +44,7 @@ public class TestGenericAvro extends AvroDataTest {
     }
 
     List<Record> rows;
-    try (AvroIterable<Record> reader = Avro.read(Files.localInput(testFile))
+    try (AvroIterable<Record> reader = Avro.read(outputFile.toInputFile())
         .project(schema)
         .build()) {
       rows = Lists.newArrayList(reader);
diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java 
b/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java
new file mode 100644
index 000000000..f25b21bac
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class InMemoryInputFile implements InputFile {
+
+  private final String location;
+  private final byte[] contents;
+
+  public InMemoryInputFile(byte[] contents) {
+    this("memory:" + UUID.randomUUID(), contents);
+  }
+
+  public InMemoryInputFile(String location, byte[] contents) {
+    Preconditions.checkNotNull(location, "location is null");
+    Preconditions.checkNotNull(contents, "contents is null");
+    this.location = location;
+    this.contents = contents.clone();
+  }
+
+  @Override
+  public long getLength() {
+    return contents.length;
+  }
+
+  @Override
+  public SeekableInputStream newStream() {
+    return new InMemorySeekableInputStream(contents);
+  }
+
+  @Override
+  public String location() {
+    return location;
+  }
+
+  @Override
+  public boolean exists() {
+    return true;
+  }
+
+  private static class InMemorySeekableInputStream extends SeekableInputStream 
{
+
+    private final int length;
+    private final ByteArrayInputStream delegate;
+
+    InMemorySeekableInputStream(byte[] contents) {
+      this.length = contents.length;
+      this.delegate = new ByteArrayInputStream(contents);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return length - delegate.available();
+    }
+
+    @Override
+    public void seek(long newPos) throws IOException {
+      delegate.reset(); // resets to a marked position
+      Preconditions.checkState(delegate.skip(newPos) == newPos,
+          "Invalid position %s within stream of length %s", newPos, length);
+    }
+
+    @Override
+    public int read() {
+      return delegate.read();
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+      return delegate.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+      return delegate.read(b, off, len);
+    }
+
+    @Override
+    public long skip(long n) {
+      return delegate.skip(n);
+    }
+
+    @Override
+    public int available() {
+      return delegate.available();
+    }
+
+    @Override
+    public boolean markSupported() {
+      return false;
+    }
+
+    @Override
+    public void mark(int readAheadLimit) {
+      // The delegate's mark is used to implement seek
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void reset() {
+      delegate.reset();
+    }
+
+    @Override
+    public void close() throws IOException {
+      delegate.close();
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java 
b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
new file mode 100644
index 000000000..c64ec00c6
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class InMemoryOutputFile implements OutputFile {
+
+  private final String location;
+
+  private boolean exists = false;
+  private ByteArrayOutputStream contents;
+
+  public InMemoryOutputFile() {
+    this("memory:" + UUID.randomUUID());
+  }
+
+  public InMemoryOutputFile(String location) {
+    Preconditions.checkNotNull(location, "location is null");
+    this.location = location;
+  }
+
+  @Override
+  public PositionOutputStream create() {
+    if (exists) {
+      throw new AlreadyExistsException("Already exists");
+    }
+    return createOrOverwrite();
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite() {
+    exists = true;
+    contents = new ByteArrayOutputStream();
+    return new InMemoryPositionOutputStream(contents);
+  }
+
+  @Override
+  public String location() {
+    return location;
+  }
+
+  @Override
+  public InputFile toInputFile() {
+    Preconditions.checkState(exists, "Cannot convert a file that has not been 
written yet");
+    return new InMemoryInputFile(location(), toByteArray());
+  }
+
+  public byte[] toByteArray() {
+    return contents.toByteArray();
+  }
+
+  private static class InMemoryPositionOutputStream extends 
PositionOutputStream {
+    private final ByteArrayOutputStream delegate;
+
+    InMemoryPositionOutputStream(ByteArrayOutputStream delegate) {
+      Preconditions.checkNotNull(delegate, "delegate is null");
+      this.delegate = delegate;
+    }
+
+    @Override
+    public long getPos() {
+      return delegate.size();
+    }
+
+    @Override
+    public void write(int b) {
+      delegate.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+      delegate.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) {
+      delegate.write(b, off, len);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      delegate.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      delegate.close();
+    }
+  }
+}

Reply via email to