[ 
https://issues.apache.org/jira/browse/ARROW-1047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16263125#comment-16263125
 ] 

ASF GitHub Bot commented on ARROW-1047:
---------------------------------------

wesm closed pull request #1259: ARROW-1047: [Java] Add Generic Reader Interface 
for Stream Format
URL: https://github.com/apache/arrow/pull/1259
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java 
b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
index 3091bc4da..ce6b5164a 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
@@ -23,8 +23,8 @@
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java 
b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
index ab8fa6e45..6e45305bf 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
@@ -22,8 +22,8 @@
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.file.ArrowFileReader;
-import org.apache.arrow.vector.file.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java 
b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
index 6722b30fa..3db01f40c 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
@@ -21,8 +21,8 @@
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.file.ArrowFileReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 
 import java.io.File;
 import java.io.FileInputStream;
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java 
b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
index d2b35e65a..666f1ddea 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
@@ -22,11 +22,11 @@
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFileReader;
-import org.apache.arrow.vector.file.ArrowFileWriter;
-import org.apache.arrow.vector.file.json.JsonFileReader;
-import org.apache.arrow.vector.file.json.JsonFileWriter;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.JsonFileReader;
+import org.apache.arrow.vector.ipc.JsonFileWriter;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java 
b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
index ef1a11f6b..42d336af9 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
@@ -21,8 +21,8 @@
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.file.ArrowFileWriter;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
 
 import java.io.File;
 import java.io.FileInputStream;
diff --git 
a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java 
b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
index c56a5a330..eac517d96 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
@@ -28,9 +28,9 @@
 import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.arrow.vector.complex.writer.BigIntWriter;
 import org.apache.arrow.vector.complex.writer.IntWriter;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFileReader;
-import org.apache.arrow.vector.file.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.Assert;
 
diff --git 
a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java 
b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
index 89714e447..d8693c596 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -44,8 +44,8 @@
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import 
org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.ArrowType.Int;
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java 
b/java/vector/src/main/codegen/templates/UnionVector.java
index e44edbd47..73165315e 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -32,7 +32,7 @@
 import org.apache.arrow.vector.BaseDataValueVector;
 import org.apache.arrow.vector.complex.impl.ComplexCopier;
 import org.apache.arrow.vector.util.CallBack;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.memory.BaseAllocator;
 import org.apache.arrow.vector.BaseValueVector;
 import org.apache.arrow.vector.util.OversizedAllocationException;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
index 38524ff8a..6d9eb1db0 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
@@ -22,7 +22,7 @@
 import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 
 import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.vector.util.CallBack;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
index 209758e4e..f82077f69 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
@@ -28,7 +28,7 @@
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.memory.BaseAllocator;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.util.CallBack;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java
index edf4987de..b9e5442ec 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java
@@ -25,7 +25,7 @@
 import org.apache.arrow.memory.BaseAllocator;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.complex.NullableMapVector;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.util.CallBack;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
index c6d404e15..26c817008 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
@@ -24,7 +24,7 @@
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.holders.BitHolder;
 import org.apache.arrow.vector.holders.NullableBitHolder;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.OversizedAllocationException;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java 
b/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java
index 23252ca69..2d4db85c5 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java
@@ -20,7 +20,7 @@
 
 import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 
 /**
  * Helper class for performing generic operations on a bit vector buffer.
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java 
b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
index a0dbf2bdc..332ca228a 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
@@ -18,7 +18,7 @@
 
 package org.apache.arrow.vector;
 
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 
 import io.netty.buffer.ArrowBuf;
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
index c2ed17eb4..509eeda75 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
@@ -20,7 +20,7 @@
 
 import java.util.List;
 
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.pojo.Field;
 
 import io.netty.buffer.ArrowBuf;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
index 58fc80bbb..2cd4099c6 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -24,9 +24,9 @@
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.schema.VectorLayout;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.VectorLayout;
 import org.apache.arrow.vector.types.pojo.Field;
 
 import com.google.common.collect.Iterators;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
index fd9677312..2b034894a 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -22,9 +22,9 @@
 import java.util.List;
 
 import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.schema.ArrowVectorType;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.ArrowVectorType;
 
 public class VectorUnloader {
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
index 3cc93a2a3..0ab3a7b68 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
@@ -28,7 +28,7 @@
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.complex.impl.NullReader;
 import org.apache.arrow.vector.complex.reader.FieldReader;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType.Null;
 import org.apache.arrow.vector.types.pojo.Field;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
index 6713b1c78..774a10dbf 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
@@ -33,7 +33,7 @@
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.*;
 import org.apache.arrow.vector.complex.impl.UnionFixedSizeListReader;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
index afe86a692..d50d4c447 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
@@ -42,7 +42,7 @@
 import org.apache.arrow.vector.complex.impl.UnionListWriter;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.complex.writer.FieldWriter;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
index f95302f55..e223d1ce6 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
@@ -34,7 +34,7 @@
 import org.apache.arrow.vector.complex.impl.NullableMapReaderImpl;
 import org.apache.arrow.vector.complex.impl.NullableMapWriter;
 import org.apache.arrow.vector.holders.ComplexHolder;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.ArrowType.Struct;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
similarity index 77%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
index d711b9c6c..4cd702622 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -26,32 +26,45 @@
 
 import org.apache.arrow.flatbuf.Footer;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.stream.MessageSerializer;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowFooter;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ArrowFileReader extends ArrowReader<SeekableReadChannel> {
+public class ArrowFileReader extends ArrowReader {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ArrowFileReader.class);
 
+  private SeekableReadChannel in;
   private ArrowFooter footer;
   private int currentDictionaryBatch = 0;
   private int currentRecordBatch = 0;
 
+  public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) {
+    super(allocator);
+    this.in = in;
+  }
+
   public ArrowFileReader(SeekableByteChannel in, BufferAllocator allocator) {
-    super(new SeekableReadChannel(in), allocator);
+    this(new SeekableReadChannel(in), allocator);
   }
 
-  public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) {
-    super(in, allocator);
+  @Override
+  public long bytesRead() {
+    return in.bytesRead();
+  }
+
+  @Override
+  protected void closeReadSource() throws IOException {
+    in.close();
   }
 
   @Override
-  protected Schema readSchema(SeekableReadChannel in) throws IOException {
+  protected Schema readSchema() throws IOException {
     if (footer == null) {
       if (in.size() <= (ArrowMagic.MAGIC_LENGTH * 2 + 4)) {
         throw new InvalidArrowFileException("file too small: " + in.size());
@@ -82,18 +95,30 @@ protected Schema readSchema(SeekableReadChannel in) throws 
IOException {
   }
 
   @Override
-  protected ArrowMessage readMessage(SeekableReadChannel in, BufferAllocator 
allocator) throws IOException {
-    if (currentDictionaryBatch < footer.getDictionaries().size()) {
-      ArrowBlock block = 
footer.getDictionaries().get(currentDictionaryBatch++);
-      return readDictionaryBatch(in, block, allocator);
-    } else if (currentRecordBatch < footer.getRecordBatches().size()) {
+  public ArrowDictionaryBatch readDictionary() throws IOException {
+    if (currentDictionaryBatch >= footer.getDictionaries().size()) {
+      throw new IOException("Requested more dictionaries than defined in 
footer: " + currentDictionaryBatch);
+    }
+    ArrowBlock block = footer.getDictionaries().get(currentDictionaryBatch++);
+    return readDictionaryBatch(in, block, allocator);
+  }
+
+  // Returns true if a batch was read, false if no more batches
+  @Override
+  public boolean loadNextBatch() throws IOException {
+    prepareLoadNextBatch();
+
+    if (currentRecordBatch < footer.getRecordBatches().size()) {
       ArrowBlock block = footer.getRecordBatches().get(currentRecordBatch++);
-      return readRecordBatch(in, block, allocator);
+      ArrowRecordBatch batch = readRecordBatch(in, block, allocator);
+      loadRecordBatch(batch);
+      return true;
     } else {
-      return null;
+      return false;
     }
   }
 
+
   public List<ArrowBlock> getDictionaryBlocks() throws IOException {
     ensureInitialized();
     return footer.getDictionaries();
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
similarity index 94%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
index 1d92d2bde..1b687c9f2 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
@@ -24,6 +24,8 @@
 
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowFooter;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowMagic.java
similarity index 93%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowMagic.java
index 68313e787..a9310a608 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowMagic.java
@@ -16,7 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
+
+import org.apache.arrow.vector.ipc.WriteChannel;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
similarity index 65%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
index 21fb2207e..6d708a03c 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,32 +33,25 @@
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowMessage.ArrowMessageVisitor;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.util.DictionaryUtility;
 
 /**
- * Abstract class to read ArrowRecordBatches from a ReadChannel.
+ * Abstract class to read Schema and ArrowRecordBatches.
  *
- * @param <T> Type of ReadChannel to use
  */
-public abstract class ArrowReader<T extends ReadChannel> implements 
DictionaryProvider, AutoCloseable {
-
-  private final T in;
-  private final BufferAllocator allocator;
+public abstract class ArrowReader implements DictionaryProvider, AutoCloseable 
{
 
+  protected final BufferAllocator allocator;
   private VectorLoader loader;
   private VectorSchemaRoot root;
   private Map<Long, Dictionary> dictionaries;
-
   private boolean initialized = false;
 
-  protected ArrowReader(T in, BufferAllocator allocator) {
-    this.in = in;
+  protected ArrowReader(BufferAllocator allocator) {
     this.allocator = allocator;
   }
 
@@ -105,58 +98,18 @@ public Dictionary lookup(long id) {
    * @return true if a batch was read, false on EOS
    * @throws IOException
    */
-  public boolean loadNextBatch() throws IOException {
-    ensureInitialized();
-    // read in all dictionary batches, then stop after our first record batch
-    ArrowMessageVisitor<Boolean> visitor = new ArrowMessageVisitor<Boolean>() {
-      @Override
-      public Boolean visit(ArrowDictionaryBatch message) {
-        try {
-          load(message);
-        } finally {
-          message.close();
-        }
-        return true;
-      }
-
-      @Override
-      public Boolean visit(ArrowRecordBatch message) {
-        try {
-          loader.load(message);
-        } finally {
-          message.close();
-        }
-        return false;
-      }
-    };
-    root.setRowCount(0);
-    ArrowMessage message = readMessage(in, allocator);
-
-    boolean readBatch = false;
-    while (message != null) {
-      if (!message.accepts(visitor)) {
-        readBatch = true;
-        break;
-      }
-      // else read a dictionary
-      message = readMessage(in, allocator);
-    }
-
-    return readBatch;
-  }
+  public abstract boolean loadNextBatch() throws IOException;
 
   /**
    * Return the number of bytes read from the ReadChannel.
    *
    * @return number of bytes read
    */
-  public long bytesRead() {
-    return in.bytesRead();
-  }
+  public abstract long bytesRead();
 
   /**
    * Close resources, including vector schema root and dictionary vectors, and 
the
-   * underlying ReadChannel.
+   * underlying read source.
    *
    * @throws IOException
    */
@@ -167,12 +120,12 @@ public void close() throws IOException {
 
   /**
    * Close resources, including vector schema root and dictionary vectors. If 
the flag
-   * closeReadChannel is true then close the underlying ReadChannel, otherwise 
leave it open.
+   * closeReadChannel is true then close the underlying read source, otherwise 
leave it open.
    *
-   * @param closeReadChannel Flag to control if closing the underlying 
ReadChannel
+   * @param closeReadSource Flag to control if closing the underlying read 
source
    * @throws IOException
    */
-  public void close(boolean closeReadChannel) throws IOException {
+  public void close(boolean closeReadSource) throws IOException {
     if (initialized) {
       root.close();
       for (Dictionary dictionary : dictionaries.values()) {
@@ -180,15 +133,40 @@ public void close(boolean closeReadChannel) throws 
IOException {
       }
     }
 
-    if (closeReadChannel) {
-      in.close();
+    if (closeReadSource) {
+      closeReadSource();
     }
   }
 
-  protected abstract Schema readSchema(T in) throws IOException;
+  /**
+   * Close the underlying read source.
+   *
+   * @throws IOException
+   */
+  protected abstract void closeReadSource() throws IOException;
+
+  /**
+   * Read the Schema from the source, will be invoked at the beginning the 
initialization.
+   *
+   * @return the read Schema
+   * @throws IOException
+   */
+  protected abstract Schema readSchema() throws IOException;
 
-  protected abstract ArrowMessage readMessage(T in, BufferAllocator allocator) 
throws IOException;
+  /**
+   * Read a dictionary batch from the source, will be invoked after the schema 
has been read and
+   * called N times, where N is the number of dictionaries indicated by the 
schema Fields.
+   *
+   * @return the read ArrowDictionaryBatch
+   * @throws IOException
+   */
+  protected abstract ArrowDictionaryBatch readDictionary() throws IOException;
 
+  /**
+   * Initialize if not done previously.
+   *
+   * @throws IOException
+   */
   protected void ensureInitialized() throws IOException {
     if (!initialized) {
       initialize();
@@ -200,7 +178,7 @@ protected void ensureInitialized() throws IOException {
    * Reads the schema and initializes the vectors
    */
   private void initialize() throws IOException {
-    Schema originalSchema = readSchema(in);
+    Schema originalSchema = readSchema();
     List<Field> fields = new ArrayList<>();
     List<FieldVector> vectors = new ArrayList<>();
     Map<Long, Dictionary> dictionaries = new HashMap<>();
@@ -216,9 +194,43 @@ private void initialize() throws IOException {
     this.root = new VectorSchemaRoot(schema, vectors, 0);
     this.loader = new VectorLoader(root);
     this.dictionaries = Collections.unmodifiableMap(dictionaries);
+
+    // Read and load all dictionaries from schema
+    for (int i = 0; i < dictionaries.size(); i++) {
+      ArrowDictionaryBatch dictionaryBatch = readDictionary();
+      loadDictionary(dictionaryBatch);
+    }
   }
 
-  private void load(ArrowDictionaryBatch dictionaryBatch) {
+  /**
+   * Ensure the reader has been initialized and reset the VectorSchemaRoot row 
count to 0.
+   *
+   * @throws IOException
+   */
+  protected void prepareLoadNextBatch() throws IOException {
+    ensureInitialized();
+    root.setRowCount(0);
+  }
+
+  /**
+   * Load an ArrowRecordBatch to the readers VectorSchemaRoot.
+   *
+   * @param batch the record batch to load
+   */
+  protected void loadRecordBatch(ArrowRecordBatch batch) {
+    try {
+      loader.load(batch);
+    } finally {
+      batch.close();
+    }
+  }
+
+  /**
+   * Load an ArrowDictionaryBatch to the readers dictionary vectors.
+   *
+   * @param dictionaryBatch
+   */
+  protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
     long id = dictionaryBatch.getDictionaryId();
     Dictionary dictionary = dictionaries.get(id);
     if (dictionary == null) {
@@ -227,6 +239,10 @@ private void load(ArrowDictionaryBatch dictionaryBatch) {
     FieldVector vector = dictionary.getVector();
     VectorSchemaRoot root = new 
VectorSchemaRoot(ImmutableList.of(vector.getField()), ImmutableList.of(vector), 
0);
     VectorLoader loader = new VectorLoader(root);
-    loader.load(dictionaryBatch.getDictionary());
+    try {
+      loader.load(dictionaryBatch.getDictionary());
+    } finally {
+      dictionaryBatch.close();
+    }
   }
 }
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
new file mode 100644
index 000000000..d1e480218
--- /dev/null
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.arrow.flatbuf.Message;
+import org.apache.arrow.flatbuf.MessageHeader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageChannelReader;
+import org.apache.arrow.vector.ipc.message.MessageReader;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/**
+ * This classes reads from an input stream and produces ArrowRecordBatches.
+ */
+public class ArrowStreamReader extends ArrowReader {
+
+  private MessageReader messageReader;
+
+  /**
+   * Constructs a streaming reader using the MessageReader interface. 
Non-blocking.
+   *
+   * @param messageReader interface to get read messages
+   * @param allocator to allocate new buffers
+   */
+  public ArrowStreamReader(MessageReader messageReader, BufferAllocator 
allocator) {
+    super(allocator);
+    this.messageReader = messageReader;
+  }
+
+  /**
+   * Constructs a streaming reader from a ReadableByteChannel input. 
Non-blocking.
+   *
+   * @param in ReadableByteChannel to read messages from
+   * @param allocator to allocate new buffers
+   */
+  public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) {
+    this(new MessageChannelReader(new ReadChannel(in)), allocator);
+  }
+
+  /**
+   * Constructs a streaming reader from an InputStream. Non-blocking.
+   *
+   * @param in InputStream to read messages from
+   * @param allocator to allocate new buffers
+   */
+  public ArrowStreamReader(InputStream in, BufferAllocator allocator) {
+    this(Channels.newChannel(in), allocator);
+  }
+
+  /**
+   * Get the number of bytes read from the stream since constructing the 
reader.
+   *
+   * @return number of bytes
+   */
+  @Override
+  public long bytesRead() {
+    return messageReader.bytesRead();
+  }
+
+  /**
+   * Closes the underlying read source.
+   *
+   * @throws IOException
+   */
+  @Override
+  protected void closeReadSource() throws IOException {
+    messageReader.close();
+  }
+
+  /**
+   * Load the next ArrowRecordBatch to the vector schema root if available.
+   *
+   * @return true if a batch was read, false on EOS
+   * @throws IOException
+   */
+  public boolean loadNextBatch() throws IOException {
+    prepareLoadNextBatch();
+
+    Message message = messageReader.readNextMessage();
+
+    // Reached EOS
+    if (message == null) {
+      return false;
+    }
+
+    if (message.headerType() != MessageHeader.RecordBatch) {
+      throw new IOException("Expected RecordBatch but header was " + 
message.headerType());
+    }
+
+    ArrowRecordBatch batch = 
MessageSerializer.deserializeRecordBatch(messageReader, message, allocator);
+    loadRecordBatch(batch);
+    return true;
+  }
+
+  /**
+   * Reads the schema message from the beginning of the stream.
+   *
+   * @return the deserialized arrow schema
+   */
+  @Override
+  protected Schema readSchema() throws IOException {
+    return MessageSerializer.deserializeSchema(messageReader);
+  }
+
+  /**
+   * Read a dictionary batch message, will be invoked after the schema and 
before normal record
+   * batches are read.
+   *
+   * @return the deserialized dictionary batch
+   * @throws IOException
+   */
+  @Override
+  protected ArrowDictionaryBatch readDictionary() throws IOException {
+    Message message = messageReader.readNextMessage();
+
+    if (message.headerType() != MessageHeader.DictionaryBatch) {
+      throw new IOException("Expected DictionaryBatch but header was " + 
message.headerType());
+    }
+
+    return MessageSerializer.deserializeDictionaryBatch(messageReader, 
message, allocator);
+  }
+}
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
 b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
similarity index 84%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
index b854cd2bb..d731d05b8 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
@@ -16,16 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.stream;
+package org.apache.arrow.vector.ipc;
 
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.file.WriteChannel;
-import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.ArrowWriter;
+import org.apache.arrow.vector.ipc.WriteChannel;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import java.io.IOException;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
similarity index 95%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
index 7dc10b5e6..4b483d010 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
@@ -30,9 +30,10 @@
 import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.stream.MessageSerializer;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.util.DictionaryUtility;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/InvalidArrowFileException.java
similarity index 96%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/InvalidArrowFileException.java
index 607207f41..ad9d8776e 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/InvalidArrowFileException.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 public class InvalidArrowFileException extends RuntimeException {
   private static final long serialVersionUID = 1L;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
 b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
similarity index 98%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
index 8017b385d..cb11a2530 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
@@ -16,19 +16,18 @@
  * limitations under the License.
  
******************************************************************************/
 
-package org.apache.arrow.vector.file.json;
+package org.apache.arrow.vector.ipc;
 
 import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
 import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
 import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
 import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.arrow.vector.schema.ArrowVectorType.*;
+import static org.apache.arrow.vector.ipc.message.ArrowVectorType.*;
 
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -42,9 +41,8 @@
 import org.apache.arrow.vector.*;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.file.InvalidArrowFileException;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowVectorType;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowVectorType;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java
 b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
similarity index 98%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
index 0c8507b51..22423b844 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  
******************************************************************************/
 
-package org.apache.arrow.vector.file.json;
+package org.apache.arrow.vector.ipc;
 
-import static org.apache.arrow.vector.schema.ArrowVectorType.*;
+import static org.apache.arrow.vector.ipc.message.ArrowVectorType.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -33,7 +33,7 @@
 import org.apache.arrow.vector.*;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.schema.ArrowVectorType;
+import org.apache.arrow.vector.ipc.message.ArrowVectorType;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
similarity index 98%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
index b0eb8f3d8..395fd7db5 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/SeekableReadChannel.java
similarity index 97%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/SeekableReadChannel.java
index 46bea1314..62ba3b73e 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/SeekableReadChannel.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.channels.SeekableByteChannel;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
similarity index 97%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
index 89c9d1f9b..da500aa97 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -25,7 +25,7 @@
 import com.google.flatbuffers.FlatBufferBuilder;
 
 import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.schema.FBSerializable;
+import org.apache.arrow.vector.ipc.message.FBSerializable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBlock.java
similarity index 96%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBlock.java
index e1b4d6a8b..8731f77ac 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBlock.java
@@ -16,10 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc.message;
 
 import org.apache.arrow.flatbuf.Block;
-import org.apache.arrow.vector.schema.FBSerializable;
 
 import com.google.flatbuffers.FlatBufferBuilder;
 
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBuffer.java
similarity index 97%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBuffer.java
index 4e0187e79..6b0eeaad4 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBuffer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import org.apache.arrow.flatbuf.Buffer;
 
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
similarity index 97%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
index 635fa3fb4..cd23cb96b 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import com.google.flatbuffers.FlatBufferBuilder;
 import org.apache.arrow.flatbuf.DictionaryBatch;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFieldNode.java
similarity index 97%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFieldNode.java
index 3ed384ed7..ca0087f70 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFieldNode.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import org.apache.arrow.flatbuf.FieldNode;
 
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFooter.java
similarity index 96%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFooter.java
index 1e95321fd..f7794f736 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFooter.java
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc.message;
 
-import static 
org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector;
+import static 
org.apache.arrow.vector.ipc.message.FBSerializables.writeAllStructsToVector;
 
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.arrow.flatbuf.Block;
 import org.apache.arrow.flatbuf.Footer;
-import org.apache.arrow.vector.schema.FBSerializable;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import com.google.flatbuffers.FlatBufferBuilder;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
similarity index 96%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
index f59b4b6c1..92fb58e16 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 public interface ArrowMessage extends FBSerializable, AutoCloseable {
 
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
similarity index 94%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
index bf0967a27..6c6481e74 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
-
-import static 
org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector;
+package org.apache.arrow.vector.ipc.message;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -113,9 +111,9 @@ public int getLength() {
   @Override
   public int writeTo(FlatBufferBuilder builder) {
     RecordBatch.startNodesVector(builder, nodes.size());
-    int nodesOffset = writeAllStructsToVector(builder, nodes);
+    int nodesOffset = FBSerializables.writeAllStructsToVector(builder, nodes);
     RecordBatch.startBuffersVector(builder, buffers.size());
-    int buffersOffset = writeAllStructsToVector(builder, buffersLayout);
+    int buffersOffset = FBSerializables.writeAllStructsToVector(builder, 
buffersLayout);
     RecordBatch.startRecordBatch(builder);
     RecordBatch.addLength(builder, length);
     RecordBatch.addNodes(builder, nodesOffset);
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowVectorType.java
similarity index 98%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowVectorType.java
index 9d2fdfaaf..3342652be 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowVectorType.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import java.util.Map;
 
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializable.java
similarity index 95%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializable.java
index 91d60ea99..31f55bd52 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import com.google.flatbuffers.FlatBufferBuilder;
 
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializables.java
similarity index 96%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializables.java
index ae5aa555e..6717ed7ab 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializables.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import java.util.ArrayList;
 import java.util.Collections;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java
new file mode 100644
index 000000000..5bc3e1fff
--- /dev/null
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc.message;
+
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.flatbuf.Message;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ReadChannel;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Reads a sequence of messages using a ReadChannel.
+ */
+public class MessageChannelReader implements MessageReader {
+
+  private ReadChannel in;
+
+  /**
+   * Construct from an existing ReadChannel.
+   *
+   * @param in Channel to read messages from
+   */
+  public MessageChannelReader(ReadChannel in) {
+    this.in = in;
+  }
+
+  /**
+   * Read the next message from the ReadChannel.
+   *
+   * @return A Message or null if ReadChannel has no more messages, indicated 
by message length of 0
+   * @throws IOException
+   */
+  @Override
+  public Message readNextMessage() throws IOException {
+    // Read the message size. There is an i32 little endian prefix.
+    ByteBuffer buffer = ByteBuffer.allocate(4);
+    if (in.readFully(buffer) != 4) {
+      return null;
+    }
+    int messageLength = MessageSerializer.bytesToInt(buffer.array());
+    if (messageLength == 0) {
+      return null;
+    }
+
+    buffer = ByteBuffer.allocate(messageLength);
+    if (in.readFully(buffer) != messageLength) {
+      throw new IOException(
+          "Unexpected end of stream trying to read message.");
+    }
+    buffer.rewind();
+
+    return Message.getRootAsMessage(buffer);
+  }
+
+  /**
+   * Read a message body from the ReadChannel.
+   *
+   * @param message Read message that is followed by a body of data
+   * @param allocator BufferAllocator to allocate memory for body data
+   * @return ArrowBuf containing the message body data
+   * @throws IOException
+   */
+  @Override
+  public ArrowBuf readMessageBody(Message message, BufferAllocator allocator) 
throws IOException {
+
+    int bodyLength = (int) message.bodyLength();
+
+    // Now read the record batch body
+    ArrowBuf buffer = allocator.buffer(bodyLength);
+    if (in.readFully(buffer, bodyLength) != bodyLength) {
+      throw new IOException("Unexpected end of input trying to read batch.");
+    }
+
+    return buffer;
+  }
+
+  /**
+   * Get the number of bytes read from the ReadChannel.
+   *
+   * @return number of bytes
+   */
+  @Override
+  public long bytesRead() {
+    return in.bytesRead();
+  }
+
+  /**
+   * Close the ReadChannel.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    in.close();
+  }
+}
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageReader.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageReader.java
new file mode 100644
index 000000000..b277c5829
--- /dev/null
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageReader.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc.message;
+
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.flatbuf.Message;
+import org.apache.arrow.memory.BufferAllocator;
+
+import java.io.IOException;
+
+/**
+ * Interface for reading a sequence of messages.
+ */
+public interface MessageReader {
+
+  /**
+   * Read the next message in the sequence.
+   *
+   * @return The read message or null if reached the end of the message 
sequence
+   * @throws IOException
+   */
+  Message readNextMessage() throws IOException;
+
+  /**
+   * When a message is followed by a body of data, read that data into an 
ArrowBuf. This should
+   * only be called when a Message has a body length > 0.
+   *
+   * @param message Read message that is followed by a body of data
+   * @param allocator BufferAllocator to allocate memory for body data
+   * @return An ArrowBuf containing the body of the message that was read
+   * @throws IOException
+   */
+  ArrowBuf readMessageBody(Message message, BufferAllocator allocator) throws 
IOException;
+
+  /**
+   * Return the current number of bytes that have been read.
+   *
+   * @return number of bytes read
+   */
+  long bytesRead();
+
+  /**
+   * Close any resource opened by the message reader, not including message 
body allocations.
+   *
+   * @throws IOException
+   */
+  void close() throws IOException;
+}
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
similarity index 86%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index c397cec72..e2f8f7d9a 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.stream;
+package org.apache.arrow.vector.ipc.message;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -31,14 +31,8 @@
 import org.apache.arrow.flatbuf.MetadataVersion;
 import org.apache.arrow.flatbuf.RecordBatch;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ReadChannel;
-import org.apache.arrow.vector.file.WriteChannel;
-import org.apache.arrow.vector.schema.ArrowBuffer;
-import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.ipc.WriteChannel;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import com.google.flatbuffers.FlatBufferBuilder;
@@ -102,12 +96,12 @@ public static long serialize(WriteChannel out, Schema 
schema) throws IOException
   /**
    * Deserializes a schema object. Format is from serialize().
    *
-   * @param in the channel to deserialize from
+   * @param reader the reader interface to deserialize from
    * @return the deserialized object
    * @throws IOException if something went wrong
    */
-  public static Schema deserializeSchema(ReadChannel in) throws IOException {
-    Message message = deserializeMessage(in);
+  public static Schema deserializeSchema(MessageReader reader) throws 
IOException {
+    Message message = reader.readNextMessage();
     if (message == null) {
       throw new IOException("Unexpected end of input. Missing schema.");
     }
@@ -119,6 +113,16 @@ public static Schema deserializeSchema(ReadChannel in) 
throws IOException {
         message.header(new org.apache.arrow.flatbuf.Schema()));
   }
 
+  /**
+   * Deserializes a schema object. Format is from serialize().
+   *
+   * @param in the channel to deserialize from
+   * @return the deserialized object
+   * @throws IOException if something went wrong
+   */
+  public static Schema deserializeSchema(ReadChannel in) throws IOException {
+    return deserializeSchema(new MessageChannelReader(in));
+  }
 
   /**
    * Serializes an ArrowRecordBatch. Returns the offset and length of the 
written batch.
@@ -184,25 +188,20 @@ public static long writeBatchBuffers(WriteChannel out, 
ArrowRecordBatch batch) t
   }
 
   /**
-   * Deserializes a RecordBatch
+   * Deserializes a RecordBatch.
    *
-   * @param in      the channel to deserialize from
+   * @param reader  the reader interface to deserialize from
    * @param message the object to derialize to
    * @param alloc   to allocate buffers
    * @return the deserialized object
    * @throws IOException if something went wrong
    */
-  public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, 
Message message, BufferAllocator alloc)
+  public static ArrowRecordBatch deserializeRecordBatch(MessageReader reader, 
Message message, BufferAllocator alloc)
       throws IOException {
     RecordBatch recordBatchFB = (RecordBatch) message.header(new 
RecordBatch());
 
-    int bodyLength = (int) message.bodyLength();
-
     // Now read the record batch body
-    ArrowBuf buffer = alloc.buffer(bodyLength);
-    if (in.readFully(buffer, bodyLength) != bodyLength) {
-      throw new IOException("Unexpected end of input trying to read batch.");
-    }
+    ArrowBuf buffer = reader.readMessageBody(message, alloc);
     return deserializeRecordBatch(recordBatchFB, buffer);
   }
 
@@ -243,7 +242,14 @@ public static ArrowRecordBatch 
deserializeRecordBatch(ReadChannel in, ArrowBlock
     return deserializeRecordBatch(recordBatchFB, body);
   }
 
-  // Deserializes a record batch given the Flatbuffer metadata and in-memory 
body
+  /**
+   * Deserializes a record batch given the Flatbuffer metadata and in-memory 
body.
+   *
+   * @param recordBatchFB Deserialized FlatBuffer record batch
+   * @param body Read body of the record batch
+   * @return ArrowRecordBatch from metadata and in-memory body
+   * @throws IOException
+   */
   public static ArrowRecordBatch deserializeRecordBatch(RecordBatch 
recordBatchFB,
                                                         ArrowBuf body) throws 
IOException {
     // Now read the body
@@ -314,26 +320,21 @@ public static ArrowBlock serialize(WriteChannel out, 
ArrowDictionaryBatch batch)
   }
 
   /**
-   * Deserializes a DictionaryBatch
+   * Deserializes a DictionaryBatch.
    *
-   * @param in      where to read from
+   * @param reader  where to read from
    * @param message the message message metadata to deserialize
    * @param alloc   the allocator for new buffers
    * @return the corresponding dictionary batch
    * @throws IOException if something went wrong
    */
-  public static ArrowDictionaryBatch deserializeDictionaryBatch(ReadChannel in,
+  public static ArrowDictionaryBatch deserializeDictionaryBatch(MessageReader 
reader,
                                                                 Message 
message,
                                                                 
BufferAllocator alloc) throws IOException {
     DictionaryBatch dictionaryBatchFB = (DictionaryBatch) message.header(new 
DictionaryBatch());
 
-    int bodyLength = (int) message.bodyLength();
-
     // Now read the record batch body
-    ArrowBuf body = alloc.buffer(bodyLength);
-    if (in.readFully(body, bodyLength) != bodyLength) {
-      throw new IOException("Unexpected end of input trying to read batch.");
-    }
+    ArrowBuf body = reader.readMessageBody(message, alloc);
     ArrowRecordBatch recordBatch = 
deserializeRecordBatch(dictionaryBatchFB.data(), body);
     return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch);
   }
@@ -377,8 +378,16 @@ public static ArrowDictionaryBatch 
deserializeDictionaryBatch(ReadChannel in,
     return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch);
   }
 
-  public static ArrowMessage deserializeMessageBatch(ReadChannel in, 
BufferAllocator alloc) throws IOException {
-    Message message = deserializeMessage(in);
+  /**
+   * Deserialize a message that is either an ArrowDictionaryBatch or 
ArrowRecordBatch.
+   *
+   * @param reader Interface to read messages from
+   * @param alloc Allocator for message data
+   * @return The deserialized record batch
+   * @throws IOException if the message is not an ArrowDictionaryBatch or 
ArrowRecordBatch
+   */
+  public static ArrowMessage deserializeMessageBatch(MessageReader reader, 
BufferAllocator alloc) throws IOException {
+    Message message = reader.readNextMessage();
     if (message == null) {
       return null;
     } else if (message.bodyLength() > Integer.MAX_VALUE) {
@@ -391,14 +400,26 @@ public static ArrowMessage 
deserializeMessageBatch(ReadChannel in, BufferAllocat
 
     switch (message.headerType()) {
       case MessageHeader.RecordBatch:
-        return deserializeRecordBatch(in, message, alloc);
+        return deserializeRecordBatch(reader, message, alloc);
       case MessageHeader.DictionaryBatch:
-        return deserializeDictionaryBatch(in, message, alloc);
+        return deserializeDictionaryBatch(reader, message, alloc);
       default:
         throw new IOException("Unexpected message header type " + 
message.headerType());
     }
   }
 
+  /**
+   * Deserialize a message that is either an ArrowDictionaryBatch or 
ArrowRecordBatch.
+   *
+   * @param in ReadChannel to read messages from
+   * @param alloc Allocator for message data
+   * @return The deserialized record batch
+   * @throws IOException if the message is not an ArrowDictionaryBatch or 
ArrowRecordBatch
+   */
+  public static ArrowMessage deserializeMessageBatch(ReadChannel in, 
BufferAllocator alloc) throws IOException {
+    return deserializeMessageBatch(new MessageChannelReader(in), alloc);
+  }
+
   /**
    * Serializes a message header.
    *
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/TypeLayout.java
similarity index 80%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/TypeLayout.java
index 29407bf1a..06fe94816 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/TypeLayout.java
@@ -16,15 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import static java.util.Arrays.asList;
-import static org.apache.arrow.vector.schema.VectorLayout.booleanVector;
-import static org.apache.arrow.vector.schema.VectorLayout.byteVector;
-import static org.apache.arrow.vector.schema.VectorLayout.dataVector;
-import static org.apache.arrow.vector.schema.VectorLayout.offsetVector;
-import static org.apache.arrow.vector.schema.VectorLayout.typeVector;
-import static org.apache.arrow.vector.schema.VectorLayout.validityVector;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -64,7 +58,7 @@ public static TypeLayout getTypeLayout(final ArrowType 
arrowType) {
 
       @Override
       public TypeLayout visit(Int type) {
-        return newFixedWidthTypeLayout(dataVector(type.getBitWidth()));
+        return 
newFixedWidthTypeLayout(VectorLayout.dataVector(type.getBitWidth()));
       }
 
       @Override
@@ -74,14 +68,14 @@ public TypeLayout visit(Union type) {
           case Dense:
             vectors = asList(
                 // TODO: validate this
-                validityVector(),
-                typeVector(),
-                offsetVector() // offset to find the vector
+                VectorLayout.validityVector(),
+                VectorLayout.typeVector(),
+                VectorLayout.offsetVector() // offset to find the vector
             );
             break;
           case Sparse:
             vectors = asList(
-                typeVector() // type of the value at the index or 0 if null
+                VectorLayout.typeVector() // type of the value at the index or 
0 if null
             );
             break;
           default:
@@ -93,21 +87,21 @@ public TypeLayout visit(Union type) {
       @Override
       public TypeLayout visit(Struct type) {
         List<VectorLayout> vectors = asList(
-            validityVector()
+            VectorLayout.validityVector()
         );
         return new TypeLayout(vectors);
       }
 
       @Override
       public TypeLayout visit(Timestamp type) {
-        return newFixedWidthTypeLayout(dataVector(64));
+        return newFixedWidthTypeLayout(VectorLayout.dataVector(64));
       }
 
       @Override
       public TypeLayout 
visit(org.apache.arrow.vector.types.pojo.ArrowType.List type) {
         List<VectorLayout> vectors = asList(
-            validityVector(),
-            offsetVector()
+            VectorLayout.validityVector(),
+            VectorLayout.offsetVector()
         );
         return new TypeLayout(vectors);
       }
@@ -115,7 +109,7 @@ public TypeLayout 
visit(org.apache.arrow.vector.types.pojo.ArrowType.List type)
       @Override
       public TypeLayout visit(FixedSizeList type) {
         List<VectorLayout> vectors = asList(
-            validityVector()
+            VectorLayout.validityVector()
         );
         return new TypeLayout(vectors);
       }
@@ -136,18 +130,18 @@ public TypeLayout visit(FloatingPoint type) {
           default:
             throw new UnsupportedOperationException("Unsupported Precision: " 
+ type.getPrecision());
         }
-        return newFixedWidthTypeLayout(dataVector(bitWidth));
+        return newFixedWidthTypeLayout(VectorLayout.dataVector(bitWidth));
       }
 
       @Override
       public TypeLayout visit(Decimal type) {
         // TODO: check size
-        return newFixedWidthTypeLayout(dataVector(64)); // actually depends on 
the type fields
+        return newFixedWidthTypeLayout(VectorLayout.dataVector(64)); // 
actually depends on the type fields
       }
 
       @Override
       public TypeLayout visit(Bool type) {
-        return newFixedWidthTypeLayout(booleanVector());
+        return newFixedWidthTypeLayout(VectorLayout.booleanVector());
       }
 
       @Override
@@ -161,7 +155,7 @@ public TypeLayout visit(Utf8 type) {
       }
 
       private TypeLayout newVariableWidthTypeLayout() {
-        return newPrimitiveTypeLayout(validityVector(), offsetVector(), 
byteVector());
+        return newPrimitiveTypeLayout(VectorLayout.validityVector(), 
VectorLayout.offsetVector(), VectorLayout.byteVector());
       }
 
       private TypeLayout newPrimitiveTypeLayout(VectorLayout... vectors) {
@@ -169,7 +163,7 @@ private TypeLayout newPrimitiveTypeLayout(VectorLayout... 
vectors) {
       }
 
       public TypeLayout newFixedWidthTypeLayout(VectorLayout dataVector) {
-        return newPrimitiveTypeLayout(validityVector(), dataVector);
+        return newPrimitiveTypeLayout(VectorLayout.validityVector(), 
dataVector);
       }
 
       @Override
@@ -179,21 +173,21 @@ public TypeLayout visit(Null type) {
 
       @Override
       public TypeLayout visit(Date type) {
-        return newFixedWidthTypeLayout(dataVector(64));
+        return newFixedWidthTypeLayout(VectorLayout.dataVector(64));
       }
 
       @Override
       public TypeLayout visit(Time type) {
-        return newFixedWidthTypeLayout(dataVector(type.getBitWidth()));
+        return 
newFixedWidthTypeLayout(VectorLayout.dataVector(type.getBitWidth()));
       }
 
       @Override
       public TypeLayout visit(Interval type) { // TODO: check size
         switch (type.getUnit()) {
           case DAY_TIME:
-            return newFixedWidthTypeLayout(dataVector(64));
+            return newFixedWidthTypeLayout(VectorLayout.dataVector(64));
           case YEAR_MONTH:
-            return newFixedWidthTypeLayout(dataVector(64));
+            return newFixedWidthTypeLayout(VectorLayout.dataVector(64));
           default:
             throw new UnsupportedOperationException("Unknown unit " + 
type.getUnit());
         }
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/VectorLayout.java
similarity index 89%
rename from 
java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java
rename to 
java/vector/src/main/java/org/apache/arrow/vector/ipc/message/VectorLayout.java
index 0871baf38..e4f2f98fd 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/VectorLayout.java
@@ -16,12 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
-
-import static org.apache.arrow.vector.schema.ArrowVectorType.DATA;
-import static org.apache.arrow.vector.schema.ArrowVectorType.OFFSET;
-import static org.apache.arrow.vector.schema.ArrowVectorType.TYPE;
-import static org.apache.arrow.vector.schema.ArrowVectorType.VALIDITY;
+package org.apache.arrow.vector.ipc.message;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -30,14 +25,14 @@
 
 public class VectorLayout implements FBSerializable {
 
-  private static final VectorLayout VALIDITY_VECTOR = new 
VectorLayout(VALIDITY, 1);
-  private static final VectorLayout OFFSET_VECTOR = new VectorLayout(OFFSET, 
32);
-  private static final VectorLayout TYPE_VECTOR = new VectorLayout(TYPE, 32);
-  private static final VectorLayout BOOLEAN_VECTOR = new VectorLayout(DATA, 1);
-  private static final VectorLayout VALUES_64 = new VectorLayout(DATA, 64);
-  private static final VectorLayout VALUES_32 = new VectorLayout(DATA, 32);
-  private static final VectorLayout VALUES_16 = new VectorLayout(DATA, 16);
-  private static final VectorLayout VALUES_8 = new VectorLayout(DATA, 8);
+  private static final VectorLayout VALIDITY_VECTOR = new 
VectorLayout(ArrowVectorType.VALIDITY, 1);
+  private static final VectorLayout OFFSET_VECTOR = new 
VectorLayout(ArrowVectorType.OFFSET, 32);
+  private static final VectorLayout TYPE_VECTOR = new 
VectorLayout(ArrowVectorType.TYPE, 32);
+  private static final VectorLayout BOOLEAN_VECTOR = new 
VectorLayout(ArrowVectorType.DATA, 1);
+  private static final VectorLayout VALUES_64 = new 
VectorLayout(ArrowVectorType.DATA, 64);
+  private static final VectorLayout VALUES_32 = new 
VectorLayout(ArrowVectorType.DATA, 32);
+  private static final VectorLayout VALUES_16 = new 
VectorLayout(ArrowVectorType.DATA, 16);
+  private static final VectorLayout VALUES_8 = new 
VectorLayout(ArrowVectorType.DATA, 8);
 
   public static VectorLayout typeVector() {
     return TYPE_VECTOR;
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
deleted file mode 100644
index 5b6300076..000000000
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.vector.stream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ReadChannel;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.types.pojo.Schema;
-
-/**
- * This classes reads from an input stream and produces ArrowRecordBatches.
- */
-public class ArrowStreamReader extends ArrowReader<ReadChannel> {
-
-  /**
-   * Constructs a streaming read, reading bytes from 'in'. Non-blocking.
-   *
-   * @param in        the stream to read from
-   * @param allocator to allocate new buffers
-   */
-  public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) {
-    super(new ReadChannel(in), allocator);
-  }
-
-  public ArrowStreamReader(InputStream in, BufferAllocator allocator) {
-    this(Channels.newChannel(in), allocator);
-  }
-
-  /**
-   * Reads the schema message from the beginning of the stream.
-   *
-   * @param in to allocate new buffers
-   * @return the deserialized arrow schema
-   */
-  @Override
-  protected Schema readSchema(ReadChannel in) throws IOException {
-    return MessageSerializer.deserializeSchema(in);
-  }
-
-  @Override
-  protected ArrowMessage readMessage(ReadChannel in, BufferAllocator 
allocator) throws IOException {
-    return MessageSerializer.deserializeMessageBatch(in, allocator);
-  }
-}
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java 
b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
index eba149bf7..574612833 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
@@ -43,8 +43,8 @@
 import org.apache.arrow.flatbuf.Type;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.schema.TypeLayout;
-import org.apache.arrow.vector.schema.VectorLayout;
+import org.apache.arrow.vector.ipc.message.VectorLayout;
+import org.apache.arrow.vector.ipc.message.TypeLayout;
 import org.apache.arrow.vector.types.pojo.ArrowType.Int;
 
 public class Field {
@@ -117,9 +117,9 @@ public static Field 
convertField(org.apache.arrow.flatbuf.Field field) {
       }
       dictionary = new DictionaryEncoding(dictionaryFB.id(), 
dictionaryFB.isOrdered(), indexType);
     }
-    ImmutableList.Builder<org.apache.arrow.vector.schema.VectorLayout> layout 
= ImmutableList.builder();
+    ImmutableList.Builder<VectorLayout> layout = ImmutableList.builder();
     for (int i = 0; i < field.layoutLength(); ++i) {
-      layout.add(new 
org.apache.arrow.vector.schema.VectorLayout(field.layout(i)));
+      layout.add(new VectorLayout(field.layout(i)));
     }
     ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
     for (int i = 0; i < field.childrenLength(); i++) {
diff --git 
a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java 
b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
index c7ee202f9..f51a87436 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
@@ -17,7 +17,6 @@
  */
 
 package org.apache.arrow.vector;
-import org.apache.arrow.vector.holders.VarCharHolder;
 import org.apache.arrow.vector.util.OversizedAllocationException;
 
 import static org.apache.arrow.vector.TestUtils.newNullableVarBinaryVector;
@@ -38,15 +37,14 @@
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.schema.TypeLayout;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.TypeLayout;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.TransferPair;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git 
a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java 
b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
index 3853eecac..e61dbecf4 100644
--- 
a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
+++ 
b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
@@ -39,8 +39,8 @@
 import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.arrow.vector.complex.writer.BigIntWriter;
 import org.apache.arrow.vector.complex.writer.IntWriter;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
diff --git 
a/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java
similarity index 99%
rename from 
java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java
rename to 
java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java
index 874ba99e2..233b682c9 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
diff --git 
a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
similarity index 94%
rename from 
java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
rename to 
java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
index f968768f5..239d3034a 100644
--- 
a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
+++ 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.stream;
+package org.apache.arrow.vector.ipc;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertArrayEquals;
@@ -33,12 +33,11 @@
 import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ReadChannel;
-import org.apache.arrow.vector.file.WriteChannel;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowMessage;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.types.pojo.Field;
diff --git 
a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFile.java
similarity index 94%
rename from 
java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
rename to 
java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFile.java
index 8559969a2..4387db036 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFile.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -40,16 +40,14 @@
 import org.apache.arrow.vector.NullableIntVector;
 import org.apache.arrow.vector.NullableTinyIntVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.complex.FixedSizeListVector;
 import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.NullableMapVector;
 import 
org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
-import org.apache.arrow.vector.schema.ArrowBuffer;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
-import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowBuffer;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
@@ -108,52 +106,41 @@ public void testWriteRead() throws IOException {
     // read
     try (BufferAllocator readerAllocator = 
allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
          FileInputStream fileInputStream = new FileInputStream(file);
-         ArrowFileReader arrowReader = new 
ArrowFileReader(fileInputStream.getChannel(), readerAllocator) {
-           @Override
-           protected ArrowMessage readMessage(SeekableReadChannel in, 
BufferAllocator allocator) throws IOException {
-             ArrowMessage message = super.readMessage(in, allocator);
-             if (message != null) {
-               ArrowRecordBatch batch = (ArrowRecordBatch) message;
-               List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
-               for (ArrowBuffer arrowBuffer : buffersLayout) {
-                 Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
-               }
-             }
-             return message;
-           }
-         }) {
-      Schema schema = arrowReader.getVectorSchemaRoot().getSchema();
-      LOGGER.debug("reading schema: " + schema);
+         ArrowFileReader arrowReader = new 
ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+
       VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      VectorUnloader unloader = new VectorUnloader(root);
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
       for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
         arrowReader.loadRecordBatch(rbBlock);
         Assert.assertEquals(count, root.getRowCount());
+        ArrowRecordBatch batch = unloader.getRecordBatch();
+        List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
+        for (ArrowBuffer arrowBuffer : buffersLayout) {
+          Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+        }
         validateContent(count, root);
+        batch.close();
       }
     }
 
     // Read from stream.
     try (BufferAllocator readerAllocator = 
allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
          ByteArrayInputStream input = new 
ByteArrayInputStream(stream.toByteArray());
-         ArrowStreamReader arrowReader = new ArrowStreamReader(input, 
readerAllocator) {
-           @Override
-           protected ArrowMessage readMessage(ReadChannel in, BufferAllocator 
allocator) throws IOException {
-             ArrowMessage message = super.readMessage(in, allocator);
-             if (message != null) {
-               ArrowRecordBatch batch = (ArrowRecordBatch) message;
-               List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
-               for (ArrowBuffer arrowBuffer : buffersLayout) {
-                 Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
-               }
-             }
-             return message;
-           }
-         }) {
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, 
readerAllocator)) {
 
       VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      VectorUnloader unloader = new VectorUnloader(root);
       Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
       Assert.assertTrue(arrowReader.loadNextBatch());
+      ArrowRecordBatch batch = unloader.getRecordBatch();
+      List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
+      for (ArrowBuffer arrowBuffer : buffersLayout) {
+        Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+      }
+      batch.close();
       Assert.assertEquals(count, root.getRowCount());
       validateContent(count, root);
     }
diff --git 
a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFooter.java
similarity index 93%
rename from 
java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
rename to 
java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFooter.java
index 461246532..235e8c164 100644
--- 
a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFooter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -27,6 +27,8 @@
 import java.util.List;
 
 import org.apache.arrow.flatbuf.Footer;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowFooter;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
diff --git 
a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
similarity index 85%
rename from 
java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
rename to 
java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
index 3ce01a268..49e194b51 100644
--- 
a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
+++ 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import static java.nio.channels.Channels.newChannel;
 import static java.util.Arrays.asList;
@@ -37,9 +37,15 @@
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.TestUtils;
+import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.SeekableReadChannel;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
@@ -87,7 +93,10 @@ public void test() throws IOException {
          ArrowFileWriter writer = new ArrowFileWriter(root, null, 
newChannel(out))) {
       ArrowBuf validityb = buf(validity);
       ArrowBuf valuesb = buf(values);
-      writer.writeRecordBatch(new ArrowRecordBatch(16, asList(new 
ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
+      ArrowRecordBatch batch = new ArrowRecordBatch(16, asList(new 
ArrowFieldNode(16, 8)), asList(validityb, valuesb));
+      VectorLoader loader = new VectorLoader(root);
+      loader.load(batch);
+      writer.writeBatch();
     }
 
     byte[] byteArray = out.toByteArray();
@@ -100,7 +109,9 @@ public void test() throws IOException {
       // TODO: dictionaries
       List<ArrowBlock> recordBatches = reader.getRecordBlocks();
       assertEquals(1, recordBatches.size());
-      ArrowRecordBatch recordBatch = (ArrowRecordBatch) 
reader.readMessage(channel, allocator);
+      reader.loadNextBatch();
+      VectorUnloader unloader = new 
VectorUnloader(reader.getVectorSchemaRoot());
+      ArrowRecordBatch recordBatch = unloader.getRecordBatch();
       List<ArrowFieldNode> nodes = recordBatch.getNodes();
       assertEquals(1, nodes.size());
       ArrowFieldNode node = nodes.get(0);
diff --git 
a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
similarity index 88%
rename from 
java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java
rename to 
java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
index c7e34191a..7a8586a9e 100644
--- 
a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
-import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -27,16 +26,12 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
-import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.NullableTinyIntVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
-import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.BaseFileTest;
+import org.apache.arrow.vector.ipc.MessageSerializerTest;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.Assert;
 import org.junit.Test;
diff --git 
a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
similarity index 78%
rename from 
java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
rename to 
java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
index f393733cc..65e6cea2e 100644
--- 
a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
+++ 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -30,10 +30,9 @@
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.NullableTinyIntVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
-import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.MessageSerializerTest;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.Assert;
 import org.junit.Test;
@@ -95,37 +94,28 @@ public long bytesWritten() {
     public ReaderThread(ReadableByteChannel sourceChannel)
         throws IOException {
       reader = new ArrowStreamReader(sourceChannel, alloc) {
-        @Override
-        protected ArrowMessage readMessage(ReadChannel in, BufferAllocator 
allocator) throws IOException {
-          // Read all the batches. Each batch contains an incrementing id and 
then some
-          // constant data. Verify both.
-          ArrowMessage message = super.readMessage(in, allocator);
-          if (message == null) {
-            done = true;
-          } else {
-            batchesRead++;
-          }
-          return message;
-        }
 
         @Override
         public boolean loadNextBatch() throws IOException {
-          if (!super.loadNextBatch()) {
+          if (super.loadNextBatch()) {
+            batchesRead++;
+          } else {
+            done = true;
             return false;
           }
-          if (!done) {
-            VectorSchemaRoot root = getVectorSchemaRoot();
-            Assert.assertEquals(16, root.getRowCount());
-            NullableTinyIntVector vector = (NullableTinyIntVector) 
root.getFieldVectors().get(0);
-            Assert.assertEquals((byte) (batchesRead - 1), vector.get(0));
-            for (int i = 1; i < 16; i++) {
-              if (i < 8) {
-                Assert.assertEquals((byte) (i + 1), vector.get(i));
-              } else {
-                Assert.assertTrue(vector.isNull(i));
-              }
+
+          VectorSchemaRoot root = getVectorSchemaRoot();
+          Assert.assertEquals(16, root.getRowCount());
+          NullableTinyIntVector vector = (NullableTinyIntVector) 
root.getFieldVectors().get(0);
+          Assert.assertEquals((byte) (batchesRead - 1), vector.get(0));
+          for (int i = 1; i < 16; i++) {
+            if (i < 8) {
+              Assert.assertEquals((byte) (i + 1), vector.get(i));
+            } else {
+              Assert.assertTrue(vector.isNull(i));
             }
           }
+
           return true;
         }
       };
@@ -139,7 +129,7 @@ public void run() {
             
reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectorTypes().toString(),
             
reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectors().size()
 > 0);
         while (!done) {
-          assertTrue(reader.loadNextBatch());
+          assertTrue(reader.loadNextBatch() != done);
         }
         reader.close();
       } catch (IOException e) {
diff --git 
a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java 
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestJSONFile.java
similarity index 99%
rename from 
java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
rename to 
java/vector/src/test/java/org/apache/arrow/vector/ipc/TestJSONFile.java
index 5c4c48cd2..c3e0b7951 100644
--- 
a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestJSONFile.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file.json;
+package org.apache.arrow.vector.ipc;
 
 import java.io.File;
 import java.io.IOException;
@@ -28,7 +28,6 @@
 import org.apache.arrow.vector.complex.NullableMapVector;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import 
org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
-import org.apache.arrow.vector.file.BaseFileTest;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.util.Validator;
 import org.junit.Assert;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> [Java] Add generalized stream writer and reader interfaces that are decoupled 
> from IO / message framing
> -------------------------------------------------------------------------------------------------------
>
>                 Key: ARROW-1047
>                 URL: https://issues.apache.org/jira/browse/ARROW-1047
>             Project: Apache Arrow
>          Issue Type: New Feature
>          Components: Java - Vectors
>            Reporter: Wes McKinney
>            Assignee: Bryan Cutler
>              Labels: pull-request-available
>             Fix For: 0.8.0
>
>
> cc [~julienledem] [~elahrvivaz] [~nongli]
> The ArrowWriter 
> https://github.com/apache/arrow/blob/master/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
>  accepts a WriteableByteChannel where the stream is written
> It would be useful to be able to support other kinds of message framing and 
> transport, like GRPC or HTTP. So rather than writing a complete Arrow stream 
> as a single contiguous byte stream, the component messages (schema, 
> dictionaries, and record batches) would be framed as separate messages in the 
> underlying protocol. 
> So if we were using ProtocolBuffers and gRPC as the underlying transport for 
> the stream, we could encapsulate components of an Arrow stream in objects 
> like:
> {code:language=protobuf}
> message ArrowMessagePB {
>   required bytes serialized_data;
> }
> {code}
> If the transport supports zero copy, that is obviously better than 
> serializing then parsing a protocol buffer.
> We should do this work in C++ as well to support more flexible stream 
> transport. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to