This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ecbe116 ARROW-5658: [JAVA] Sync schema for VectorSchemaRoot
ecbe116 is described below
commit ecbe116b778aa02488d1a24c7073ebf7079f5bbe
Author: liyafan82 <[email protected]>
AuthorDate: Tue Jul 2 21:48:06 2019 -0700
ARROW-5658: [JAVA] Sync schema for VectorSchemaRoot
Resolve JIRA ARROW-5658.
The fundamental problem is that, as data are inserted to the vector (e.g.
ListVector), the schema of VectorShemaRoot can be different from the vector
structure.
In the server side, the deserialization is based on the schema, which is
out of date, so it fails silently.
In this PR, we fix the problem of obsolete schema and make the server print
error info explicitly.
Author: liyafan82 <[email protected]>
Closes #4689 from liyafan82/fly_5658 and squashes the following commits:
e7d5865a5 <liyafan82> Undo throwing exception
3ea77bc76 <liyafan82> Replace automatic updating schema with throwing an
exception
cb9da2032 <liyafan82> Merge branch 'master' into fly_5658
47a776fbe <liyafan82> Automatically update schema
6d2763848 <liyafan82> Resolve comments
061e8bc2f <liyafan82> Fix error log
e8ea49f00 <liyafan82> Sync schema for VectorSchemaRoot
---
.../org/apache/arrow/flight/FlightService.java | 1 +
.../org/apache/arrow/vector/VectorSchemaRoot.java | 22 ++++-
.../apache/arrow/vector/TestVectorSchemaRoot.java | 93 ++++++++++++++++++++++
3 files changed, 115 insertions(+), 1 deletion(-)
diff --git
a/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java
b/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java
index ee45cef..e805917 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java
@@ -191,6 +191,7 @@ class FlightService extends FlightServiceImplBase {
StreamPipe.wrap(responseObserver, PutResult::toProtocol)).run();
responseObserver.onCompleted();
} catch (Exception ex) {
+ logger.error("Failed to process custom put.", ex);
responseObserver.onError(ex);
// The client may have terminated, so the exception here is
effectively swallowed.
// Log the error as well so -something- makes it to the developer.
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
index 373e03c..a3fab14 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
@@ -35,7 +35,7 @@ import org.apache.arrow.vector.types.pojo.Schema;
*/
public class VectorSchemaRoot implements AutoCloseable {
- private final Schema schema;
+ private Schema schema;
private int rowCount;
private final List<FieldVector> fieldVectors;
private final Map<String, FieldVector> fieldVectorsMap = new HashMap<>();
@@ -206,4 +206,24 @@ public class VectorSchemaRoot implements AutoCloseable {
}
return sb.toString();
}
+
+ /**
+ * Synchronizes the schema from the current vectors.
+ * In some cases, the schema and the actual vector structure may be
different.
+ * This can be caused by a promoted writer (For details, please see
+ * {@link org.apache.arrow.vector.complex.impl.PromotableWriter}).
+ * For example, when writing different types of data to a {@link
org.apache.arrow.vector.complex.ListVector}
+ * may lead to such a case.
+ * When this happens, this method should be called to bring the schema and
vector structure in a synchronized state.
+ * @return true if the schema is updated, false otherwise.
+ */
+ public boolean syncSchema() {
+ List<Field> oldFields = this.schema.getFields();
+ List<Field> newFields =
this.fieldVectors.stream().map(ValueVector::getField).collect(Collectors.toList());
+ if (!oldFields.equals(newFields)) {
+ this.schema = new Schema(newFields);
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java
b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java
index 480dcac..f9525f4 100644
---
a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java
+++
b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java
@@ -17,10 +17,24 @@
package org.apache.arrow.vector;
+import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -75,4 +89,83 @@ public class TestVectorSchemaRoot {
assertEquals(vec2.getValueCount(), count);
assertEquals(vsr.getRowCount(), count);
}
+
+ private VectorSchemaRoot createBatch() {
+ FieldType varCharType = new FieldType(true, new ArrowType.Utf8(),
/*dictionary=*/null);
+ FieldType listType = new FieldType(true, new ArrowType.List(),
/*dictionary=*/null);
+
+ // create the schema
+ List<Field> schemaFields = new ArrayList<>();
+ Field childField = new Field("varCharCol", varCharType, null);
+ List<Field> childFields = new ArrayList<>();
+ childFields.add(childField);
+ schemaFields.add(new Field("listCol", listType, childFields));
+ Schema schema = new Schema(schemaFields);
+
+ VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, allocator);
+ // get and allocate the vector
+ ListVector vector = (ListVector) schemaRoot.getVector("listCol");
+ vector.allocateNew();
+
+ // write data to the vector
+ UnionListWriter writer = vector.getWriter();
+
+ writer.setPosition(0);
+
+ // write data vector(0)
+ writer.startList();
+
+ // write data vector(0)(0)
+ writer.list().startList();
+
+ // According to the schema above, the list element should have varchar
type.
+ // When we write a big int, the original writer cannot handle this, so the
writer will
+ // be promoted, and the vector structure will be different from the schema.
+ writer.list().bigInt().writeBigInt(0);
+ writer.list().bigInt().writeBigInt(1);
+ writer.list().endList();
+
+ // write data vector(0)(1)
+ writer.list().startList();
+ writer.list().float8().writeFloat8(3.0D);
+ writer.list().float8().writeFloat8(7.0D);
+ writer.list().endList();
+
+ // finish data vector(0)
+ writer.endList();
+
+ writer.setPosition(1);
+
+ // write data vector(1)
+ writer.startList();
+
+ // write data vector(1)(0)
+ writer.list().startList();
+ writer.list().integer().writeInt(3);
+ writer.list().integer().writeInt(2);
+ writer.list().endList();
+
+ // finish data vector(1)
+ writer.endList();
+
+ vector.setValueCount(2);
+
+ return schemaRoot;
+ }
+
+ @Test
+ public void testSchemaSync() {
+ //create vector schema root
+ try (VectorSchemaRoot schemaRoot = createBatch()) {
+ Schema newSchema = new Schema(
+ schemaRoot.getFieldVectors().stream().map(vec ->
vec.getField()).collect(Collectors.toList()));
+
+ assertNotEquals(newSchema, schemaRoot.getSchema());
+ assertTrue(schemaRoot.syncSchema());
+ assertEquals(newSchema, schemaRoot.getSchema());
+
+ // no schema update this time.
+ assertFalse(schemaRoot.syncSchema());
+ }
+ }
}