This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ed54952b5 Core: Use memory-backed streams in tests (#4534)
ed54952b5 is described below
commit ed54952b51a6cdc7d2350b8223c276421845a8e2
Author: Piotr Findeisen <[email protected]>
AuthorDate: Thu Apr 21 21:51:39 2022 +0200
Core: Use memory-backed streams in tests (#4534)
Sometimes an actual file is needed, but sometimes it's not. In the case
where it isn't, we can use an in-memory `InputFile`, `OutputFile`
implementation.
---
.../apache/iceberg/TestManifestListVersions.java | 3 +-
.../apache/iceberg/TestManifestWriterVersions.java | 3 +-
.../iceberg/TestScansAndSchemaEvolution.java | 15 +--
.../apache/iceberg/avro/TestAvroDeleteWriters.java | 9 +-
.../org/apache/iceberg/avro/TestGenericAvro.java | 13 +-
.../org/apache/iceberg/io/InMemoryInputFile.java | 131 +++++++++++++++++++++
.../org/apache/iceberg/io/InMemoryOutputFile.java | 112 ++++++++++++++++++
7 files changed, 263 insertions(+), 23 deletions(-)
diff --git
a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
index 390ba0de1..8cc64f078 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InMemoryOutputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -251,7 +252,7 @@ public class TestManifestListVersions {
}
private InputFile writeManifestList(ManifestFile manifest, int
formatVersion) throws IOException {
- OutputFile manifestList = Files.localOutput(temp.newFile());
+ OutputFile manifestList = new InMemoryOutputFile();
try (FileAppender<ManifestFile> writer = ManifestLists.write(
formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1,
formatVersion > 1 ? SEQ_NUM : 0)) {
writer.add(manifest);
diff --git
a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index c9230315e..79aa35db9 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InMemoryOutputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -235,7 +236,7 @@ public class TestManifestWriterVersions {
}
private InputFile writeManifestList(ManifestFile manifest, int
formatVersion) throws IOException {
- OutputFile manifestList = Files.localOutput(temp.newFile());
+ OutputFile manifestList = new InMemoryOutputFile();
try (FileAppender<ManifestFile> writer = ManifestLists.write(
formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1,
formatVersion > 1 ? SEQUENCE_NUMBER : 0)) {
writer.add(manifest);
diff --git
a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
index 0ae594401..5a60ece7b 100644
--- a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
+++ b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
@@ -28,6 +28,8 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.RandomAvroData;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InMemoryOutputFile;
+import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.After;
@@ -65,11 +67,11 @@ public class TestScansAndSchemaEvolution {
@Rule
public TemporaryFolder temp = new TemporaryFolder();
- private DataFile createDataFile(File dataPath, String partValue) throws
IOException {
+ private DataFile createDataFile(String partValue) throws IOException {
List<GenericData.Record> expected = RandomAvroData.generate(SCHEMA, 100,
0L);
- File dataFile = new File(dataPath,
FileFormat.AVRO.addExtension(UUID.randomUUID().toString()));
- try (FileAppender<GenericData.Record> writer =
Avro.write(Files.localOutput(dataFile))
+ OutputFile dataFile = new
InMemoryOutputFile(FileFormat.AVRO.addExtension(UUID.randomUUID().toString()));
+ try (FileAppender<GenericData.Record> writer = Avro.write(dataFile)
.schema(SCHEMA)
.named("test")
.build()) {
@@ -82,7 +84,7 @@ public class TestScansAndSchemaEvolution {
PartitionData partition = new PartitionData(SPEC.partitionType());
partition.set(0, partValue);
return DataFiles.builder(SPEC)
- .withInputFile(Files.localInput(dataFile))
+ .withInputFile(dataFile.toInputFile())
.withPartition(partition)
.withRecordCount(100)
.build();
@@ -96,13 +98,12 @@ public class TestScansAndSchemaEvolution {
@Test
public void testPartitionSourceRename() throws IOException {
File location = temp.newFolder();
- File dataLocation = new File(location, "data");
Assert.assertTrue(location.delete()); // should be created by table create
Table table = TestTables.create(location, "test", SCHEMA, SPEC,
formatVersion);
- DataFile fileOne = createDataFile(dataLocation, "one");
- DataFile fileTwo = createDataFile(dataLocation, "two");
+ DataFile fileOne = createDataFile("one");
+ DataFile fileTwo = createDataFile("two");
table.newAppend()
.appendFile(fileOne)
diff --git
a/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
b/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
index dc6113922..d772f54dd 100644
--- a/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
+++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.InMemoryOutputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -73,9 +74,7 @@ public class TestAvroDeleteWriters {
@Test
public void testEqualityDeleteWriter() throws IOException {
- File deleteFile = temp.newFile();
-
- OutputFile out = Files.localOutput(deleteFile);
+ OutputFile out = new InMemoryOutputFile();
EqualityDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
.createWriterFunc(DataWriter::create)
.overwrite()
@@ -108,8 +107,6 @@ public class TestAvroDeleteWriters {
@Test
public void testPositionDeleteWriter() throws IOException {
- File deleteFile = temp.newFile();
-
Schema deleteSchema = new Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
@@ -119,7 +116,7 @@ public class TestAvroDeleteWriters {
GenericRecord posDelete = GenericRecord.create(deleteSchema);
List<Record> expectedDeleteRecords = Lists.newArrayList();
- OutputFile out = Files.localOutput(deleteFile);
+ OutputFile out = new InMemoryOutputFile();
PositionDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
.createWriterFunc(DataWriter::create)
.overwrite()
diff --git a/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java
b/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java
index 6849657dd..f88c89127 100644
--- a/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java
+++ b/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java
@@ -19,25 +19,22 @@
package org.apache.iceberg.avro;
-import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericData.Record;
-import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InMemoryOutputFile;
+import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.junit.Assert;
public class TestGenericAvro extends AvroDataTest {
@Override
protected void writeAndValidate(Schema schema) throws IOException {
List<Record> expected = RandomAvroData.generate(schema, 100, 0L);
- File testFile = temp.newFile();
- Assert.assertTrue("Delete should succeed", testFile.delete());
-
- try (FileAppender<Record> writer = Avro.write(Files.localOutput(testFile))
+ OutputFile outputFile = new InMemoryOutputFile();
+ try (FileAppender<Record> writer = Avro.write(outputFile)
.schema(schema)
.named("test")
.build()) {
@@ -47,7 +44,7 @@ public class TestGenericAvro extends AvroDataTest {
}
List<Record> rows;
- try (AvroIterable<Record> reader = Avro.read(Files.localInput(testFile))
+ try (AvroIterable<Record> reader = Avro.read(outputFile.toInputFile())
.project(schema)
.build()) {
rows = Lists.newArrayList(reader);
diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java
b/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java
new file mode 100644
index 000000000..f25b21bac
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class InMemoryInputFile implements InputFile {
+
+ private final String location;
+ private final byte[] contents;
+
+ public InMemoryInputFile(byte[] contents) {
+ this("memory:" + UUID.randomUUID(), contents);
+ }
+
+ public InMemoryInputFile(String location, byte[] contents) {
+ Preconditions.checkNotNull(location, "location is null");
+ Preconditions.checkNotNull(contents, "contents is null");
+ this.location = location;
+ this.contents = contents.clone();
+ }
+
+ @Override
+ public long getLength() {
+ return contents.length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ return new InMemorySeekableInputStream(contents);
+ }
+
+ @Override
+ public String location() {
+ return location;
+ }
+
+ @Override
+ public boolean exists() {
+ return true;
+ }
+
+ private static class InMemorySeekableInputStream extends SeekableInputStream
{
+
+ private final int length;
+ private final ByteArrayInputStream delegate;
+
+ InMemorySeekableInputStream(byte[] contents) {
+ this.length = contents.length;
+ this.delegate = new ByteArrayInputStream(contents);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return length - delegate.available();
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ delegate.reset(); // resets to a marked position
+ Preconditions.checkState(delegate.skip(newPos) == newPos,
+ "Invalid position %s within stream of length %s", newPos, length);
+ }
+
+ @Override
+ public int read() {
+ return delegate.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return delegate.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ return delegate.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) {
+ return delegate.skip(n);
+ }
+
+ @Override
+ public int available() {
+ return delegate.available();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public void mark(int readAheadLimit) {
+ // The delegate's mark is used to implement seek
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ delegate.reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
new file mode 100644
index 000000000..c64ec00c6
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class InMemoryOutputFile implements OutputFile {
+
+ private final String location;
+
+ private boolean exists = false;
+ private ByteArrayOutputStream contents;
+
+ public InMemoryOutputFile() {
+ this("memory:" + UUID.randomUUID());
+ }
+
+ public InMemoryOutputFile(String location) {
+ Preconditions.checkNotNull(location, "location is null");
+ this.location = location;
+ }
+
+ @Override
+ public PositionOutputStream create() {
+ if (exists) {
+ throw new AlreadyExistsException("Already exists");
+ }
+ return createOrOverwrite();
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite() {
+ exists = true;
+ contents = new ByteArrayOutputStream();
+ return new InMemoryPositionOutputStream(contents);
+ }
+
+ @Override
+ public String location() {
+ return location;
+ }
+
+ @Override
+ public InputFile toInputFile() {
+ Preconditions.checkState(exists, "Cannot convert a file that has not been
written yet");
+ return new InMemoryInputFile(location(), toByteArray());
+ }
+
+ public byte[] toByteArray() {
+ return contents.toByteArray();
+ }
+
+ private static class InMemoryPositionOutputStream extends
PositionOutputStream {
+ private final ByteArrayOutputStream delegate;
+
+ InMemoryPositionOutputStream(ByteArrayOutputStream delegate) {
+ Preconditions.checkNotNull(delegate, "delegate is null");
+ this.delegate = delegate;
+ }
+
+ @Override
+ public long getPos() {
+ return delegate.size();
+ }
+
+ @Override
+ public void write(int b) {
+ delegate.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ delegate.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) {
+ delegate.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+ }
+}