Updated Branches:
  refs/heads/master bd41633f1 -> 47985bad0

Added JSONScanPOP and JSONScanBatch


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/73fad99a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/73fad99a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/73fad99a

Branch: refs/heads/master
Commit: 73fad99a752f3f37944e082353aaf790154953a1
Parents: bd41633
Author: Timothy Chen <[email protected]>
Authored: Wed Jun 19 23:21:01 2013 -0700
Committer: Timothy Chen <[email protected]>
Committed: Thu Aug 8 20:46:55 2013 -0700

----------------------------------------------------------------------
 .../templates/RepeatedValueVectors.java         |   7 +-
 .../physical/config/JSONScanBatchCreator.java   |  46 +++++
 .../drill/exec/physical/config/JSONScanPOP.java | 114 ++++++++++++
 .../drill/exec/physical/impl/ImplCreator.java   |  14 +-
 .../apache/drill/exec/schema/DiffSchema.java    |  20 +--
 .../drill/exec/store/JSONRecordReader.java      | 175 +++++++++++--------
 .../apache/drill/exec/store/VectorHolder.java   |  96 +++++-----
 .../drill/exec/vector/FixedWidthVector.java     |   1 -
 .../drill/exec/vector/RepeatedMutator.java      |  23 +++
 .../physical/impl/TestSimpleFragmentRun.java    |  13 +-
 .../resources/physical_json_scan_test1.json     |  23 +++
 11 files changed, 388 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
 
b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
index 1afe84b..c629a1d 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
@@ -249,7 +249,7 @@ import com.google.common.collect.Lists;
     }
   }
   
-  public final class Mutator implements ValueVector.Mutator{
+  public final class Mutator implements RepeatedMutator {
 
     
     private Mutator(){
@@ -262,10 +262,7 @@ import com.google.common.collect.Lists;
      * @param index   record of the element to add
      * @param value   value to add to the given row
      */
-    public void add(int index, <#if (type.width > 4)> 
${minor.javaType!type.javaType}
-                               <#elseif type.major == "VarLen"> byte[]
-                               <#else> int
-                               </#if> value) {
+    public void add(int index, <#if type.major == "VarLen">byte[]<#elseif 
(type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
       int nextOffset = offsets.getAccessor().get(index+1);
       values.getMutator().set(nextOffset, value);
       offsets.getMutator().set(index+1, nextOffset+1);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java
new file mode 100644
index 0000000..f93f03b
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+
+package org.apache.drill.exec.physical.config;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.JSONRecordReader;
+import org.apache.drill.exec.store.RecordReader;
+
+import java.util.List;
+
+public class JSONScanBatchCreator implements BatchCreator<JSONScanPOP> {
+
+    @Override
+    public RecordBatch getBatch(FragmentContext context, JSONScanPOP config, 
List<RecordBatch> children) throws ExecutionSetupException {
+        Preconditions.checkArgument(children.isEmpty());
+        List<JSONScanPOP.ScanEntry> entries = config.getReadEntries();
+        List<RecordReader> readers = Lists.newArrayList();
+        for (JSONScanPOP.ScanEntry e : entries) {
+            readers.add(new JSONRecordReader(context, e.getUrl()));
+        }
+
+        return new ScanBatch(context, readers.iterator());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java
new file mode 100644
index 0000000..1dcf5e1
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java
@@ -0,0 +1,114 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.AbstractScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.CoordinationProtos;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@JsonTypeName("json-scan")
+public class JSONScanPOP extends AbstractScan<JSONScanPOP.ScanEntry> {
+    private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb
+
+    private LinkedList[] mappings;
+
+    @JsonCreator
+    public JSONScanPOP(@JsonProperty("entries") List<JSONScanPOP.ScanEntry> 
readEntries) {
+        super(readEntries);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> 
endpoints) {
+        checkArgument(endpoints.size() <= getReadEntries().size());
+
+        mappings = new LinkedList[endpoints.size()];
+
+        int i = 0;
+        for (ScanEntry e : this.getReadEntries()) {
+            if (i == endpoints.size()) i = 0;
+            LinkedList entries = mappings[i];
+            if (entries == null) {
+                entries = new LinkedList<>();
+                mappings[i] = entries;
+            }
+            entries.add(e);
+            i++;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Scan<?> getSpecificScan(int minorFragmentId) {
+        checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] 
should be longer than minor fragment id [%s] but it isn't.", mappings.length, 
minorFragmentId);
+        return new JSONScanPOP(mappings[minorFragmentId]);
+    }
+
+    @Override
+    public List<EndpointAffinity> getOperatorAffinity() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> 
children) {
+        return new JSONScanPOP(readEntries);
+    }
+
+    public static class ScanEntry implements ReadEntry {
+        private final String url;
+        private Size size;
+
+        @JsonCreator
+        public ScanEntry(@JsonProperty("url") String url) {
+            this.url = url;
+            long fileLength = new File(URI.create(url)).length();
+            size = new Size(fileLength / ESTIMATED_RECORD_SIZE, 
ESTIMATED_RECORD_SIZE);
+        }
+
+        @Override
+        public OperatorCost getCost() {
+            return new OperatorCost(1, 1, 2, 2);
+        }
+
+        @Override
+        public Size getSize() {
+            return size;
+        }
+
+        public String getUrl() {
+            return url;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index c31e9e4..1c15289 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -26,15 +26,7 @@ import 
org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.config.MockScanBatchCreator;
-import org.apache.drill.exec.physical.config.MockScanPOP;
-import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RandomReceiver;
-import org.apache.drill.exec.physical.config.Screen;
-import org.apache.drill.exec.physical.config.SelectionVectorRemover;
-import org.apache.drill.exec.physical.config.SingleSender;
-import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.*;
 import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
 import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
@@ -74,7 +66,9 @@ public class ImplCreator extends 
AbstractPhysicalVisitor<RecordBatch, FragmentCo
     Preconditions.checkNotNull(context);
     
     if(scan instanceof MockScanPOP){
-      return msc.getBatch(context, (MockScanPOP) scan, 
Collections.<RecordBatch> emptyList());
+      return msc.getBatch(context, (MockScanPOP) scan, 
Collections.<RecordBatch>emptyList());
+    } else if(scan instanceof JSONScanPOP) {
+      return new JSONScanBatchCreator().getBatch(context, (JSONScanPOP)scan, 
Collections.<RecordBatch>emptyList());
     }else{
       return super.visitScan(scan, context);  
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
index b654a92..68c3e12 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
@@ -26,6 +26,7 @@ import java.util.List;
 public class DiffSchema {
     List<Field> addedFields;
     List<Field> removedFields;
+    boolean hasChanged = false;
 
     public DiffSchema() {
         this.addedFields = Lists.newArrayList();
@@ -34,27 +35,22 @@ public class DiffSchema {
 
     public void recordNewField(Field field) {
         addedFields.add(field);
-    }
-
-    public boolean hasDiffFields() {
-        return !addedFields.isEmpty() || !removedFields.isEmpty();
-    }
-
-    public List<Field> getAddedFields() {
-        return addedFields;
-    }
-
-    public List<Field> getRemovedFields() {
-        return removedFields;
+        hasChanged = true;
     }
 
     public void reset() {
         addedFields.clear();
         removedFields.clear();
+        hasChanged = false;
     }
 
     public void addRemovedField(Field field) {
         removedFields.add(field);
+        hasChanged = true;
+    }
+
+    public boolean isHasChanged() {
+        return hasChanged;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index f72b519..ff7d315 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -1,16 +1,15 @@
 package org.apache.drill.exec.store;
 
-import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
-import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
-import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.common.io.Resources;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -22,27 +21,19 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.schema.DiffSchema;
-import org.apache.drill.exec.schema.Field;
-import org.apache.drill.exec.schema.ListSchema;
-import org.apache.drill.exec.schema.NamedField;
-import org.apache.drill.exec.schema.ObjectSchema;
-import org.apache.drill.exec.schema.OrderedField;
-import org.apache.drill.exec.schema.RecordSchema;
-import org.apache.drill.exec.schema.SchemaIdGenerator;
+import org.apache.drill.exec.schema.*;
 import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
 import org.apache.drill.exec.vector.*;
 
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-import com.google.common.io.InputSupplier;
-import com.google.common.io.Resources;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+import static com.fasterxml.jackson.core.JsonToken.*;
 
 public class JSONRecordReader implements RecordReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
@@ -89,7 +80,7 @@ public class JSONRecordReader implements RecordReader {
       if (inputPath.startsWith("resource:")) {
         input = 
Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), 
Charsets.UTF_8);
       } else {
-        input = Files.newReaderSupplier(new File(inputPath), Charsets.UTF_8);
+        input = Files.newReaderSupplier(new File(URI.create(inputPath)), 
Charsets.UTF_8);
       }
 
       JsonFactory factory = new JsonFactory();
@@ -112,7 +103,7 @@ public class JSONRecordReader implements RecordReader {
     int nextRowIndex = 0;
 
     try {
-      while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++)) {
+      while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++, 0)) {
         parser.nextToken(); // Read to START_OBJECT token
 
         if (!parser.hasCurrentToken()) {
@@ -133,9 +124,19 @@ public class JSONRecordReader implements RecordReader {
         outputMutator.removeField(field.getAsMaterializedField());
       }
 
+      if (diffSchema.isHasChanged()) {
+        outputMutator.setNewSchema();
+      }
+
+
     } catch (IOException | SchemaChangeException e) {
       logger.error("Error reading next in Json reader", e);
     }
+
+    for (VectorHolder holder : valueVectorMap.values()) {
+      holder.populateVectorLength();
+    }
+
     return nextRowIndex;
   }
 
@@ -171,18 +172,10 @@ public class JSONRecordReader implements RecordReader {
     return removedFields;
   }
 
-  private DiffSchema getDiffSchema() {
-    return diffSchema;
-  }
-
   public BufferAllocator getAllocator() {
     return allocator;
   }
 
-  public OutputMutator getOutputMutator() {
-    return outputMutator;
-  }
-
   public static enum ReadType {
     ARRAY(END_ARRAY) {
       @Override
@@ -221,10 +214,12 @@ public class JSONRecordReader implements RecordReader {
       return endObject;
     }
 
+    @SuppressWarnings("ConstantConditions")
     public boolean readRecord(Field parentField,
                               JSONRecordReader reader,
                               String prefixFieldName,
-                              int rowIndex) throws IOException, 
SchemaChangeException {
+                              int rowIndex,
+                              int groupCount) throws IOException, 
SchemaChangeException {
       JsonParser parser = reader.getParser();
       JsonToken token = parser.nextToken();
       JsonToken endObject = getEndObject();
@@ -242,23 +237,26 @@ public class JSONRecordReader implements RecordReader {
         switch (token) {
           case START_ARRAY:
             readType = ReadType.ARRAY;
+            groupCount++;
             break;
           case START_OBJECT:
             readType = ReadType.OBJECT;
+            groupCount = 0;
             break;
         }
-        if (fieldType != null) { // Including nulls
-          boolean currentFieldNotFull = recordData(
-              parentField,
-              readType,
-              reader,
-              fieldType,
-              prefixFieldName,
-              fieldName,
-              rowIndex, colIndex);
-
-          isFull = isFull || !currentFieldNotFull;
 
+        if (fieldType != null) { // Including nulls
+          isFull = isFull ||
+              !recordData(
+                  parentField,
+                  readType,
+                  reader,
+                  fieldType,
+                  prefixFieldName,
+                  fieldName,
+                  rowIndex,
+                  colIndex,
+                  groupCount);
         }
         token = parser.nextToken();
         colIndex += 1;
@@ -286,7 +284,8 @@ public class JSONRecordReader implements RecordReader {
                                String prefixFieldName,
                                String fieldName,
                                int rowIndex,
-                               int colIndex) throws IOException, 
SchemaChangeException {
+                               int colIndex,
+                               int groupCount) throws IOException, 
SchemaChangeException {
       RecordSchema currentSchema = reader.getCurrentSchema();
       Field field = currentSchema.getField(fieldName, colIndex);
       boolean isFieldFound = field != null;
@@ -323,10 +322,14 @@ public class JSONRecordReader implements RecordReader {
         field.assignSchemaIfNull(newSchema);
 
         if (fieldSchema == null) reader.setCurrentSchema(newSchema);
-        readType.readRecord(field, reader, field.getFullFieldName(), rowIndex);
+        if(readType == ReadType.ARRAY) {
+          readType.readRecord(field, reader, field.getFullFieldName(), 
rowIndex, groupCount);
+        } else {
+          readType.readRecord(field, reader, field.getFullFieldName(), 
rowIndex, groupCount);
+        }
 
         reader.setCurrentSchema(currentSchema);
-      } else if (holder != null) {
+      } else {
         return addValueToVector(
             rowIndex,
             holder,
@@ -335,32 +338,54 @@ public class JSONRecordReader implements RecordReader {
                 reader.getParser(),
                 fieldType.getMinorType()
             ),
-            fieldType.getMinorType()
+            fieldType.getMinorType(),
+            groupCount
         );
       }
 
       return true;
     }
 
-    private static <T> boolean addValueToVector(int index, VectorHolder 
holder, BufferAllocator allocator, T val, MinorType minorType) {
+    private static <T> boolean addValueToVector(int index, VectorHolder 
holder, BufferAllocator allocator, T val, MinorType minorType, int groupCount) {
       switch (minorType) {
         case INT: {
-          holder.incAndCheckLength(32 + 1);
-          NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
-          NullableIntVector.Mutator m = int4.getMutator();
-          if (val != null) {
-            m.set(index, (Integer) val);
+          holder.incAndCheckLength(32);
+          if (groupCount == 0) {
+            if (val != null) {
+              NullableIntVector int4 = (NullableIntVector) 
holder.getValueVector();
+              NullableIntVector.Mutator m = int4.getMutator();
+              m.set(index, (Integer) val);
+            }
+          } else {
+            if (val == null) {
+              throw new UnsupportedOperationException("Nullable repeated int 
is not supported.");
+            }
+
+            RepeatedIntVector repeatedInt4 = (RepeatedIntVector) 
holder.getValueVector();
+            RepeatedIntVector.Mutator m = repeatedInt4.getMutator();
+            m.add(index, (Integer) val);
           }
-          return holder.hasEnoughSpace(32 + 1);
+
+          return holder.hasEnoughSpace(32);
         }
         case FLOAT4: {
-          holder.incAndCheckLength(32 + 1);
-          NullableFloat4Vector float4 = (NullableFloat4Vector) 
holder.getValueVector();
-          NullableFloat4Vector.Mutator m = float4.getMutator();
-          if (val != null) {
-            m.set(index, (Float) val);
+          holder.incAndCheckLength(32);
+          if (groupCount == 0) {
+            if (val != null) {
+              NullableFloat4Vector float4 = (NullableFloat4Vector) 
holder.getValueVector();
+              NullableFloat4Vector.Mutator m = float4.getMutator();
+              m.set(index, (Float) val);
+            }
+          } else {
+            if (val == null) {
+              throw new UnsupportedOperationException("Nullable repeated float 
is not supported.");
+            }
+
+            RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) 
holder.getValueVector();
+            RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
+            m.add(groupCount, (Float) val);
           }
-          return holder.hasEnoughSpace(32 + 1);
+          return holder.hasEnoughSpace(32);
         }
         case VARCHAR: {
           if (val == null) {
@@ -369,10 +394,16 @@ public class JSONRecordReader implements RecordReader {
             byte[] bytes = ((String) val).getBytes(UTF_8);
             int length = bytes.length;
             holder.incAndCheckLength(length);
-            NullableVarCharVector varLen4 = (NullableVarCharVector) 
holder.getValueVector();
-            NullableVarCharVector.Mutator m = varLen4.getMutator();
-            m.set(index, bytes);
-            return holder.hasEnoughSpace(length + 4 + 1);
+            if (groupCount == 0) {
+              NullableVarCharVector varLen4 = (NullableVarCharVector) 
holder.getValueVector();
+              NullableVarCharVector.Mutator m = varLen4.getMutator();
+              m.set(index, bytes);
+            } else {
+              RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) 
holder.getValueVector();
+              RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
+              m.add(index, bytes);
+            }
+            return holder.hasEnoughSpace(length);
           }
         }
         case BIT: {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index d594b9e..43d3cd9 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -18,58 +18,68 @@
 
 package org.apache.drill.exec.store;
 
-import org.apache.drill.exec.vector.FixedWidthVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.*;
 
 public class VectorHolder {
-    private int length;
-    private ValueVector vector;
-    private ValueVector.Mutator mutator;
-    private int currentLength;
+  private int count;
+  private int groupCount;
+  private int length;
+  private ValueVector vector;
+  private int currentLength;
 
-    VectorHolder(int length, ValueVector vector) {
-        this.length = length;
-        this.vector = vector;
-        this.mutator = vector.getMutator();
-    }
+  VectorHolder(int length, ValueVector vector) {
+    this.length = length;
+    this.vector = vector;
+  }
 
-    public ValueVector getValueVector() {
-        return vector;
-    }
+  public ValueVector getValueVector() {
+    return vector;
+  }
 
-    public void incAndCheckLength(int newLength) {
-        if (!hasEnoughSpace(newLength)) {
-            throw new BatchExceededException(length, currentLength + 
newLength);
-        }
-        currentLength += newLength;
+  public void incAndCheckLength(int newLength) {
+    if (!hasEnoughSpace(newLength)) {
+      throw new BatchExceededException(length, currentLength + newLength);
     }
+    count += 1;
+    currentLength += newLength;
+  }
 
-    public boolean hasEnoughSpace(int newLength) {
-        return length >= currentLength + newLength;
-    }
+  public void setGroupCount(int groupCount) {
+    this.groupCount = groupCount;
+  }
 
-    public int getLength() {
-        return length;
-    }
+  public boolean hasEnoughSpace(int newLength) {
+    return length >= currentLength + newLength;
+  }
 
-    public void reset() {
-        currentLength = 0;
-        allocateNew(length);
-        
-    }
-    
-    public void allocateNew(int valueLength){
-      if(vector instanceof FixedWidthVector){
-        ((FixedWidthVector)vector).allocateNew(valueLength);  
-      }else if(vector instanceof VariableWidthVector){
-        ((VariableWidthVector)vector).allocateNew(valueLength * 10, 
valueLength);  
-      }else{
-        throw new UnsupportedOperationException();
-      }
+  public int getLength() {
+    return length;
+  }
+
+  public void reset() {
+    currentLength = 0;
+    count = 0;
+    allocateNew(length);
+  }
+
+  public void populateVectorLength() {
+    ValueVector.Mutator mutator = vector.getMutator();
+    if(mutator instanceof NonRepeatedMutator) {
+      ((NonRepeatedMutator)mutator).setValueCount(count);
+    } else if(mutator instanceof RepeatedMutator) {
+      ((RepeatedMutator)mutator).setGroupAndValueCount(groupCount, count);
+    } else {
+      throw new UnsupportedOperationException("Mutator not supported: " + 
mutator.getClass().getName());
     }
-    
-    public ValueVector.Mutator getMutator(){
-      return mutator;
+  }
+
+  public void allocateNew(int valueLength) {
+    if (vector instanceof FixedWidthVector) {
+      ((FixedWidthVector) vector).allocateNew(valueLength);
+    } else if (vector instanceof VariableWidthVector) {
+      ((VariableWidthVector) vector).allocateNew(valueLength * 10, 
valueLength);
+    } else {
+      throw new UnsupportedOperationException();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
index 17e072b..e5d7a30 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
@@ -7,7 +7,6 @@ public interface FixedWidthVector extends ValueVector{
   /**
    * Allocate a new memory space for this vector.  Must be called prior to 
using the ValueVector.
    *
-   * @param totalBytes   Desired size of the underlying data buffer.
    * @param valueCount   Number of values in the vector.
    */
   public void allocateNew(int valueCount);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
new file mode 100644
index 0000000..1227d02
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
@@ -0,0 +1,23 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+
+package org.apache.drill.exec.vector;
+
+public interface RepeatedMutator extends ValueVector.Mutator {
+  public void setGroupAndValueCount(int groupCount, int valueCount);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index e21289c..5d4e700 100644
--- 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -17,13 +17,19 @@
  
******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.Charset;
 import java.util.List;
 
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.SchemaDefProtos;
 import org.apache.drill.exec.proto.UserProtos.QueryType;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -31,14 +37,19 @@ import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.vector.ValueVector;
+import org.junit.Assert;
 import org.junit.Test;
 
+import com.carrotsearch.hppc.cursors.IntObjectCursor;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 
 public class TestSimpleFragmentRun extends PopUnitTestBase {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestSimpleFragmentRun.class);
 
+  private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+
   @Test
   public void runNoExchangeFragment() throws Exception {
     try(RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json
 
b/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json
new file mode 100644
index 0000000..91eb80c
--- /dev/null
+++ 
b/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json
@@ -0,0 +1,23 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+       graph:[
+        {
+            @id:1,
+            pop:"json-scan",
+            entries:[
+               {url: 
"file:////home/tnachen/src/incubator-drill/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_1.json"}
+            ]
+        },
+        {
+            @id: 2,
+            child: 1,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

Reply via email to