This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b4d9770a70 ORC: Add FileIO support for readers and writers (#6293)
b4d9770a70 is described below

commit b4d9770a7097491451c03c4dda1479e8d389a898
Author: Pavan Lanka <[email protected]>
AuthorDate: Sun Dec 18 15:55:13 2022 -0800

    ORC: Add FileIO support for readers and writers (#6293)
---
 .../org/apache/iceberg/hadoop/HadoopStreams.java   |  57 ++++++-
 .../java/org/apache/iceberg/orc/FileIOFSUtil.java  | 164 +++++++++++++++++++++
 orc/src/main/java/org/apache/iceberg/orc/ORC.java  |  40 ++++-
 .../org/apache/iceberg/orc/OrcFileAppender.java    |  21 +--
 .../apache/iceberg/orc/TestORCFileIOProxies.java   |  85 +++++++++++
 .../org/apache/iceberg/orc/TestOrcDataWriter.java  | 125 +++++++++++++++-
 6 files changed, 460 insertions(+), 32 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java
index 1936888c5c..44023326a0 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
 import org.apache.iceberg.io.DelegatingInputStream;
 import org.apache.iceberg.io.DelegatingOutputStream;
 import org.apache.iceberg.io.PositionOutputStream;
@@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory;
  *
  * <p>This class is based on Parquet's HadoopStreams.
  */
-class HadoopStreams {
+public class HadoopStreams {
 
   private HadoopStreams() {}
 
@@ -65,6 +66,16 @@ class HadoopStreams {
     return new HadoopPositionOutputStream(stream);
   }
 
+  /**
+   * Wraps a {@link SeekableInputStream} in a {@link FSDataOutputStream} 
implementation for readers.
+   *
+   * @param stream a SeekableInputStream
+   * @return a FSDataOutputStream
+   */
+  public static FSInputStream wrap(SeekableInputStream stream) {
+    return new WrappedSeekableInputStream(stream);
+  }
+
   /**
    * SeekableInputStream implementation for FSDataInputStream that implements 
ByteBufferReadable in
    * Hadoop 2.
@@ -190,4 +201,48 @@ class HadoopStreams {
       }
     }
   }
+
+  private static class WrappedSeekableInputStream extends FSInputStream
+      implements DelegatingInputStream {
+    private final SeekableInputStream inputStream;
+
+    private WrappedSeekableInputStream(SeekableInputStream inputStream) {
+      this.inputStream = inputStream;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      inputStream.seek(pos);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return inputStream.getPos();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      throw new UnsupportedOperationException("seekToNewSource not supported");
+    }
+
+    @Override
+    public int read() throws IOException {
+      return inputStream.read();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      return inputStream.read(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+      inputStream.close();
+    }
+
+    @Override
+    public InputStream getDelegate() {
+      return inputStream;
+    }
+  }
 }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/FileIOFSUtil.java 
b/orc/src/main/java/org/apache/iceberg/orc/FileIOFSUtil.java
new file mode 100644
index 0000000000..2570799ef9
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/FileIOFSUtil.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.hadoop.HadoopStreams;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+class FileIOFSUtil {
+  private FileIOFSUtil() {}
+
+  private static class NullFileSystem extends FileSystem {
+
+    @Override
+    public URI getUri() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataInputStream open(Path f) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataOutputStream create(
+        Path f,
+        FsPermission permission,
+        boolean overwrite,
+        int bufferSize,
+        short replication,
+        long blockSize,
+        Progressable progress)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize, Progressable 
progress)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws FileNotFoundException, 
IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setWorkingDirectory(Path new_dir) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  static class InputFileSystem extends NullFileSystem {
+    private final InputFile inputFile;
+    private final Path inputPath;
+
+    InputFileSystem(InputFile inputFile) {
+      this.inputFile = inputFile;
+      this.inputPath = new Path(inputFile.location());
+    }
+
+    @Override
+    public FSDataInputStream open(Path f) throws IOException {
+      Preconditions.checkArgument(
+          f.equals(inputPath), String.format("Input %s does not equal expected 
%s", f, inputPath));
+      return new FSDataInputStream(HadoopStreams.wrap(inputFile.newStream()));
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      return open(f);
+    }
+  }
+
+  static class OutputFileSystem extends NullFileSystem {
+    private final OutputFile outputFile;
+    private final Path outPath;
+
+    OutputFileSystem(OutputFile outputFile) {
+      this.outputFile = outputFile;
+      this.outPath = new Path(outputFile.location());
+    }
+
+    @Override
+    public FSDataOutputStream create(Path f, boolean overwrite) throws 
IOException {
+      Preconditions.checkArgument(
+          f.equals(outPath), String.format("Input %s does not equal expected 
%s", f, outPath));
+      OutputStream outputStream = overwrite ? outputFile.createOrOverwrite() : 
outputFile.create();
+      return new FSDataOutputStream(outputStream, null);
+    }
+
+    @Override
+    public FSDataOutputStream create(
+        Path f,
+        FsPermission permission,
+        boolean overwrite,
+        int bufferSize,
+        short replication,
+        long blockSize,
+        Progressable progress)
+        throws IOException {
+      return create(f, overwrite);
+    }
+  }
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java 
b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 7233fe731f..89cd1ad436 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -39,6 +39,7 @@ import static 
org.apache.iceberg.TableProperties.ORC_WRITE_BATCH_SIZE;
 import static org.apache.iceberg.TableProperties.ORC_WRITE_BATCH_SIZE_DEFAULT;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Locale;
@@ -85,6 +86,7 @@ import org.apache.orc.OrcFile.CompressionStrategy;
 import org.apache.orc.OrcFile.ReaderOptions;
 import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
@@ -764,19 +766,41 @@ public class ORC {
     }
   }
 
-  static Reader newFileReader(String location, ReaderOptions readerOptions) {
+  static Reader newFileReader(InputFile file, Configuration config) {
+    ReaderOptions readerOptions = 
OrcFile.readerOptions(config).useUTCTimestamp(true);
+    if (file instanceof HadoopInputFile) {
+      readerOptions.filesystem(((HadoopInputFile) file).getFileSystem());
+    } else {
+      // In case of any other InputFile we wrap the InputFile with 
InputFileSystem that only
+      // supports the creation of an InputStream. To prevent a file status 
call to determine the
+      // length we supply the length as input
+      readerOptions.filesystem(new 
FileIOFSUtil.InputFileSystem(file)).maxLength(file.getLength());
+    }
     try {
-      return OrcFile.createReader(new Path(location), readerOptions);
+      return OrcFile.createReader(new Path(file.location()), readerOptions);
     } catch (IOException ioe) {
-      throw new RuntimeIOException(ioe, "Failed to open file: %s", location);
+      throw new RuntimeIOException(ioe, "Failed to open file: %s", 
file.location());
     }
   }
 
-  static Reader newFileReader(InputFile file, Configuration config) {
-    ReaderOptions readerOptions = 
OrcFile.readerOptions(config).useUTCTimestamp(true);
-    if (file instanceof HadoopInputFile) {
-      readerOptions.filesystem(((HadoopInputFile) file).getFileSystem());
+  static Writer newFileWriter(
+      OutputFile file, OrcFile.WriterOptions options, Map<String, byte[]> 
metadata) {
+    if (file instanceof HadoopOutputFile) {
+      options.fileSystem(((HadoopOutputFile) file).getFileSystem());
+    } else {
+      options.fileSystem(new FileIOFSUtil.OutputFileSystem(file));
     }
-    return newFileReader(file.location(), readerOptions);
+    final Path locPath = new Path(file.location());
+    final Writer writer;
+
+    try {
+      writer = OrcFile.createWriter(locPath, options);
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Can't create file %s", locPath);
+    }
+
+    metadata.forEach((key, value) -> writer.addUserMetadata(key, 
ByteBuffer.wrap(value)));
+
+    return writer;
   }
 }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index a2c0d2ccea..b8a48645ef 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -20,13 +20,11 @@ package org.apache.iceberg.orc;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.Schema;
@@ -88,8 +86,7 @@ class OrcFileAppender<D> implements FileAppender<D> {
       options.fileSystem(((HadoopOutputFile) file).getFileSystem());
     }
     options.setSchema(orcSchema);
-    this.writer = newOrcWriter(file, options, metadata);
-
+    this.writer = ORC.newFileWriter(file, options, metadata);
     this.valueWriter = newOrcRowWriter(schema, orcSchema, createWriterFunc);
   }
 
@@ -170,22 +167,6 @@ class OrcFileAppender<D> implements FileAppender<D> {
     }
   }
 
-  private static Writer newOrcWriter(
-      OutputFile file, OrcFile.WriterOptions options, Map<String, byte[]> 
metadata) {
-    final Path locPath = new Path(file.location());
-    final Writer writer;
-
-    try {
-      writer = OrcFile.createWriter(locPath, options);
-    } catch (IOException ioe) {
-      throw new RuntimeIOException(ioe, "Can't create file %s", locPath);
-    }
-
-    metadata.forEach((key, value) -> writer.addUserMetadata(key, 
ByteBuffer.wrap(value)));
-
-    return writer;
-  }
-
   @SuppressWarnings("unchecked")
   private static <D> OrcRowWriter<D> newOrcRowWriter(
       Schema schema,
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestORCFileIOProxies.java 
b/orc/src/test/java/org/apache/iceberg/orc/TestORCFileIOProxies.java
new file mode 100644
index 0000000000..91e0152dee
--- /dev/null
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestORCFileIOProxies.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class TestORCFileIOProxies {
+  @Test
+  public void testInputFileSystem() throws IOException {
+    File inputFile = File.createTempFile("read", ".orc");
+    inputFile.deleteOnExit();
+
+    InputFile localFile = Files.localInput(inputFile);
+    FileIOFSUtil.InputFileSystem ifs = new 
FileIOFSUtil.InputFileSystem(localFile);
+    InputStream is = ifs.open(new Path(localFile.location()));
+    assertNotNull(is);
+
+    // Cannot use the filesystem for any other operation
+    Assertions.assertThatThrownBy(() -> ifs.getFileStatus(new 
Path(localFile.location())))
+        .isInstanceOf(UnsupportedOperationException.class);
+
+    // Cannot use the filesystem for any other path
+    Assertions.assertThatThrownBy(() -> ifs.open(new Path("/tmp/dummy")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith("Input /tmp/dummy does not equal expected");
+  }
+
+  @Test
+  public void testOutputFileSystem() throws IOException {
+    File localFile = File.createTempFile("write", ".orc");
+    localFile.deleteOnExit();
+
+    OutputFile outputFile = Files.localOutput(localFile);
+    FileSystem ofs = new FileIOFSUtil.OutputFileSystem(outputFile);
+    try (OutputStream os = ofs.create(new Path(outputFile.location()))) {
+      os.write('O');
+      os.write('R');
+      os.write('C');
+    }
+    // No other operation is supported
+    Assertions.assertThatThrownBy(() -> ofs.open(new 
Path(outputFile.location())))
+        .isInstanceOf(UnsupportedOperationException.class);
+    // No other path is supported
+    Assertions.assertThatThrownBy(() -> ofs.create(new Path("/tmp/dummy")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith("Input /tmp/dummy does not equal expected");
+
+    FileSystem ifs = new 
FileIOFSUtil.InputFileSystem(outputFile.toInputFile());
+    try (InputStream is = ifs.open(new Path(outputFile.location()))) {
+      assertEquals('O', is.read());
+      assertEquals('R', is.read());
+      assertEquals('C', is.read());
+      assertEquals(-1, is.read());
+    }
+  }
+}
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java 
b/orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java
index 5da7fa7b09..58b48f25b5 100644
--- a/orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileFormat;
@@ -36,13 +37,17 @@ import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.orc.GenericOrcWriter;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.io.SeekableInputStream;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.OrcFile;
 import org.apache.orc.StripeInformation;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -75,9 +80,12 @@ public class TestOrcDataWriter {
   }
 
   private List<Long> stripeOffsetsFromReader(DataFile dataFile) throws 
IOException {
-    return OrcFile.createReader(
-            new Path(dataFile.path().toString()), OrcFile.readerOptions(new 
Configuration()))
-        .getStripes().stream()
+    return stripeOffsetsFromReader(dataFile, OrcFile.readerOptions(new 
Configuration()));
+  }
+
+  private List<Long> stripeOffsetsFromReader(DataFile dataFile, 
OrcFile.ReaderOptions options)
+      throws IOException {
+    return OrcFile.createReader(new Path(dataFile.path().toString()), 
options).getStripes().stream()
         .map(StripeInformation::getOffset)
         .collect(Collectors.toList());
   }
@@ -126,4 +134,115 @@ public class TestOrcDataWriter {
 
     Assert.assertEquals("Written records should match", records, 
writtenRecords);
   }
+
+  @Test
+  public void testUsingFileIO() throws IOException {
+    // When files other than HadoopInputFile and HadoopOutputFile are supplied 
the location
+    // is used to determine the corresponding FileSystem class based on the 
scheme in case of
+    // local files that would be the LocalFileSystem. To prevent this we use 
the Proxy classes to
+    // use a scheme `dummy` that is not handled.
+    ProxyOutputFile outFile = new 
ProxyOutputFile(Files.localOutput(temp.newFile()));
+    Assertions.assertThatThrownBy(
+            () -> new Path(outFile.location()).getFileSystem(new 
Configuration()))
+        .isInstanceOf(UnsupportedFileSystemException.class)
+        .hasMessageStartingWith("No FileSystem for scheme \"dummy\"");
+
+    // Given that FileIO is now handled there is no determination of 
FileSystem based on scheme
+    // but instead operations are handled by the InputFileSystem and 
OutputFileSystem that wrap
+    // the InputFile and OutputFile correspondingly. Both write and read 
should be successful.
+    DataWriter<Record> dataWriter =
+        ORC.writeData(outFile)
+            .schema(SCHEMA)
+            .createWriterFunc(GenericOrcWriter::buildWriter)
+            .overwrite()
+            .withSpec(PartitionSpec.unpartitioned())
+            .build();
+
+    try {
+      for (Record record : records) {
+        dataWriter.write(record);
+      }
+    } finally {
+      dataWriter.close();
+    }
+
+    DataFile dataFile = dataWriter.toDataFile();
+    OrcFile.ReaderOptions options =
+        OrcFile.readerOptions(new Configuration())
+            .filesystem(new 
FileIOFSUtil.InputFileSystem(outFile.toInputFile()))
+            .maxLength(outFile.toInputFile().getLength());
+    Assert.assertEquals(dataFile.splitOffsets(), 
stripeOffsetsFromReader(dataFile, options));
+    Assert.assertEquals("Format should be ORC", FileFormat.ORC, 
dataFile.format());
+    Assert.assertEquals("Should be data file", FileContent.DATA, 
dataFile.content());
+    Assert.assertEquals("Record count should match", records.size(), 
dataFile.recordCount());
+    Assert.assertEquals("Partition should be empty", 0, 
dataFile.partition().size());
+    Assert.assertNull("Key metadata should be null", dataFile.keyMetadata());
+
+    List<Record> writtenRecords;
+    try (CloseableIterable<Record> reader =
+        ORC.read(outFile.toInputFile())
+            .project(SCHEMA)
+            .createReaderFunc(fileSchema -> 
GenericOrcReader.buildReader(SCHEMA, fileSchema))
+            .build()) {
+      writtenRecords = Lists.newArrayList(reader);
+    }
+
+    Assert.assertEquals("Written records should match", records, 
writtenRecords);
+  }
+
+  private static class ProxyInputFile implements InputFile {
+    private final InputFile inputFile;
+
+    private ProxyInputFile(InputFile inputFile) {
+      this.inputFile = inputFile;
+    }
+
+    @Override
+    public long getLength() {
+      return inputFile.getLength();
+    }
+
+    @Override
+    public SeekableInputStream newStream() {
+      return inputFile.newStream();
+    }
+
+    @Override
+    public String location() {
+      return "dummy://" + inputFile.location();
+    }
+
+    @Override
+    public boolean exists() {
+      return inputFile.exists();
+    }
+  }
+
+  private static class ProxyOutputFile implements OutputFile {
+    private final OutputFile outputFile;
+
+    private ProxyOutputFile(OutputFile outputFile) {
+      this.outputFile = outputFile;
+    }
+
+    @Override
+    public PositionOutputStream create() {
+      return outputFile.create();
+    }
+
+    @Override
+    public PositionOutputStream createOrOverwrite() {
+      return outputFile.createOrOverwrite();
+    }
+
+    @Override
+    public String location() {
+      return "dummy://" + outputFile.location();
+    }
+
+    @Override
+    public InputFile toInputFile() {
+      return new ProxyInputFile(outputFile.toInputFile());
+    }
+  }
 }

Reply via email to