This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 457a1f1991 Add InMemoryFileIO as a test helper class (#6538)
457a1f1991 is described below

commit 457a1f1991a1d01faa074796cc476f6f0b7846cd
Author: Dennis Huo <[email protected]>
AuthorDate: Mon Jan 9 13:27:10 2023 -0800

    Add InMemoryFileIO as a test helper class (#6538)
    
    * Add InMemoryFileIO alongside existing InMemoryOutputFile and
    InMemoryInputFile as a test helper class stitching the two together
    and maintaining an in-memory listing of files. Add a dedicated
    unittest for the new test helper.
    
    * Update variable naming for consistency, refactor to avoid using 
containsKey
    
    * Use Maps.newConcurrentMap instead of Maps.newHashMap
---
 .../java/org/apache/iceberg/io/InMemoryFileIO.java |  72 +++++++++++++
 .../org/apache/iceberg/io/InMemoryOutputFile.java  |  26 ++++-
 .../org/apache/iceberg/io/TestInMemoryFileIO.java  | 111 +++++++++++++++++++++
 3 files changed, 207 insertions(+), 2 deletions(-)

diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryFileIO.java 
b/core/src/test/java/org/apache/iceberg/io/InMemoryFileIO.java
new file mode 100644
index 0000000000..41756043ff
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/InMemoryFileIO.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.util.Map;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class InMemoryFileIO implements FileIO {
+
+  private Map<String, byte[]> inMemoryFiles = Maps.newConcurrentMap();
+  private boolean closed = false;
+
+  public void addFile(String location, byte[] contents) {
+    Preconditions.checkState(!closed, "Cannot call addFile after calling 
close()");
+    inMemoryFiles.put(location, contents);
+  }
+
+  public boolean fileExists(String location) {
+    return inMemoryFiles.containsKey(location);
+  }
+
+  @Override
+  public InputFile newInputFile(String location) {
+    Preconditions.checkState(!closed, "Cannot call newInputFile after calling 
close()");
+    byte[] contents = inMemoryFiles.get(location);
+    if (null == contents) {
+      throw new NotFoundException("No in-memory file found for location: %s", 
location);
+    }
+    return new InMemoryInputFile(location, contents);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String location) {
+    Preconditions.checkState(!closed, "Cannot call newOutputFile after calling 
close()");
+    return new InMemoryOutputFile(location, this);
+  }
+
+  @Override
+  public void deleteFile(String location) {
+    Preconditions.checkState(!closed, "Cannot call deleteFile after calling 
close()");
+    if (null == inMemoryFiles.remove(location)) {
+      throw new NotFoundException("No in-memory file found for location: %s", 
location);
+    }
+  }
+
+  public boolean isClosed() {
+    return closed;
+  }
+
+  @Override
+  public void close() {
+    closed = true;
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java 
b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
index e8740b125f..5d72cef622 100644
--- a/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
+++ b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
@@ -30,19 +30,38 @@ public class InMemoryOutputFile implements OutputFile {
 
   private boolean exists = false;
   private ByteArrayOutputStream contents;
+  private InMemoryFileIO parentFileIO;
 
   public InMemoryOutputFile() {
     this("memory:" + UUID.randomUUID());
   }
 
   public InMemoryOutputFile(String location) {
+    this(location, null);
+  }
+
+  /**
+   * If the optional parentFileIO is provided, file-existence behaves 
similarly to S3FileIO;
+   * existence checks are performed up-front if creating without overwrite, 
but files only exist in
+   * the parentFileIO if close() has been called on the associated output 
streams (or pre-existing
+   * files are populated into the parentFileIO through other means).
+   *
+   * @param location the location returned by location() of this OutputFile, 
the InputFile obtained
+   *     from calling toInputFile(), and the location for looking up the 
associated InputFile from a
+   *     parentFileIO, if non-null.
+   * @param parentFileIO if non-null, commits an associated InMemoryInputFile 
on close() into the
+   *     parentFileIO, and uses the parentFileIO for "already exists" checks 
if creating without
+   *     overwriting.
+   */
+  public InMemoryOutputFile(String location, InMemoryFileIO parentFileIO) {
     Preconditions.checkNotNull(location, "location is null");
     this.location = location;
+    this.parentFileIO = parentFileIO;
   }
 
   @Override
   public PositionOutputStream create() {
-    if (exists) {
+    if (exists || (parentFileIO != null && parentFileIO.fileExists(location))) 
{
       throw new AlreadyExistsException("Already exists");
     }
     return createOrOverwrite();
@@ -70,7 +89,7 @@ public class InMemoryOutputFile implements OutputFile {
     return contents.toByteArray();
   }
 
-  private static class InMemoryPositionOutputStream extends 
PositionOutputStream {
+  private class InMemoryPositionOutputStream extends PositionOutputStream {
     private final ByteArrayOutputStream delegate;
     private boolean closed = false;
 
@@ -112,6 +131,9 @@ public class InMemoryOutputFile implements OutputFile {
     public void close() throws IOException {
       delegate.close();
       closed = true;
+      if (parentFileIO != null) {
+        parentFileIO.addFile(location(), toByteArray());
+      }
     }
 
     private void checkOpen() {
diff --git a/core/src/test/java/org/apache/iceberg/io/TestInMemoryFileIO.java 
b/core/src/test/java/org/apache/iceberg/io/TestInMemoryFileIO.java
new file mode 100644
index 0000000000..95118ec7d1
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/TestInMemoryFileIO.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class TestInMemoryFileIO {
+  String location = "s3://foo/bar.txt";
+
+  @Test
+  public void testBasicEndToEnd() throws IOException {
+    InMemoryFileIO fileIO = new InMemoryFileIO();
+    Assertions.assertThat(fileIO.fileExists(location)).isFalse();
+
+    OutputStream outputStream = fileIO.newOutputFile(location).create();
+    byte[] data = "hello world".getBytes();
+    outputStream.write(data);
+    outputStream.close();
+    Assertions.assertThat(fileIO.fileExists(location)).isTrue();
+
+    InputStream inputStream = fileIO.newInputFile(location).newStream();
+    byte[] buf = new byte[data.length];
+    inputStream.read(buf);
+    inputStream.close();
+    Assertions.assertThat(new String(buf)).isEqualTo("hello world");
+
+    fileIO.deleteFile(location);
+    Assertions.assertThat(fileIO.fileExists(location)).isFalse();
+  }
+
+  @Test
+  public void testNewInputFileNotFound() throws IOException {
+    InMemoryFileIO fileIO = new InMemoryFileIO();
+    Assertions.assertThatExceptionOfType(NotFoundException.class)
+        .isThrownBy(() -> fileIO.newInputFile("s3://nonexistent/file"));
+  }
+
+  @Test
+  public void testDeleteFileNotFound() throws IOException {
+    InMemoryFileIO fileIO = new InMemoryFileIO();
+    Assertions.assertThatExceptionOfType(NotFoundException.class)
+        .isThrownBy(() -> fileIO.deleteFile("s3://nonexistent/file"));
+  }
+
+  @Test
+  public void testCreateNoOverwrite() throws IOException {
+    InMemoryFileIO fileIO = new InMemoryFileIO();
+    fileIO.addFile(location, "hello world".getBytes());
+    Assertions.assertThatExceptionOfType(AlreadyExistsException.class)
+        .isThrownBy(() -> fileIO.newOutputFile(location).create());
+  }
+
+  @Test
+  public void testOverwriteBeforeAndAfterClose() throws IOException {
+    byte[] oldData = "old data".getBytes();
+    byte[] newData = "new data".getBytes();
+
+    InMemoryFileIO fileIO = new InMemoryFileIO();
+    OutputStream outputStream = fileIO.newOutputFile(location).create();
+    outputStream.write(oldData);
+
+    // Even though we've called create() and started writing data, this file 
won't yet exist
+    // in the parentFileIO before we've closed it.
+    Assertions.assertThat(fileIO.fileExists(location)).isFalse();
+
+    // File appears after closing it.
+    outputStream.close();
+    Assertions.assertThat(fileIO.fileExists(location)).isTrue();
+
+    // Start a new OutputFile and write new data but don't close() it yet.
+    outputStream = fileIO.newOutputFile(location).createOrOverwrite();
+    outputStream.write(newData);
+
+    // We'll still read old data.
+    InputStream inputStream = fileIO.newInputFile(location).newStream();
+    byte[] buf = new byte[oldData.length];
+    inputStream.read(buf);
+    inputStream.close();
+    Assertions.assertThat(new String(buf)).isEqualTo("old data");
+
+    // Finally, close the new output stream; data should be overwritten with 
new data now.
+    outputStream.close();
+    inputStream = fileIO.newInputFile(location).newStream();
+    buf = new byte[newData.length];
+    inputStream.read(buf);
+    inputStream.close();
+    Assertions.assertThat(new String(buf)).isEqualTo("new data");
+  }
+}

Reply via email to