[ 
https://issues.apache.org/jira/browse/DRILL-2835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801835#comment-17801835
 ] 

ASF GitHub Bot commented on DRILL-2835:
---------------------------------------

mbeckerle commented on code in PR #2836:
URL: https://github.com/apache/drill/pull/2836#discussion_r1439542636


##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import 
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan 
scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // 
"schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, 
rootNamespace);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to get Daffodil DFDL processor for: 
%s", fsSchemaURI))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // Create the corresponding Drill schema.
+    // Note: this could be a very large schema. Think of a large complex RDBMS 
schema,
+    // all of it, hundreds of tables, but all part of the same metadata tree.
+    TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+    // Inform Drill about the schema
+    negotiator.tableSchema(drillSchema, true);
+
+    //
+    // DATA TIME: Next we construct the runtime objects, and open files.
+    //
+    // We get the DaffodilMessageParser, which is a stateful driver for 
daffodil that
+    // actually does the parsing.
+    rowSetLoader = negotiator.build().writer();
+
+    // We construct the Daffodil InfosetOutputter which the daffodil parser 
uses to
+    // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+    DaffodilDrillInfosetOutputter outputter = new 
DaffodilDrillInfosetOutputter(rowSetLoader);
+
+    // Now we can setup the dafParser with the outputter it will drive with
+    // the parser-produced infoset.
+    dafParser = new DaffodilMessageParser(dp); // needs further initialization 
after this.
+    dafParser.setInfosetOutputter(outputter);
+
+    Path dataPath = file.split().getPath();
+    // Lastly, we open the data stream
+    try {
+      dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+    } catch (Exception e) {

Review Comment:
   Make narrower catch



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;
+    if (result)
+      assert(tupleWriterStack.size() == 1);
+    return result;
+  }
+
+  /**
+   * True if the next startComplex call will be for the
+   * DFDL infoset root element whose children are the columns of
+   * the row set.
+   */
+  private boolean isRootElement = true;
+
+  /**
+   * Stack that is used only if we have sub-structures that are not
+   * simple-type fields of the row.
+   */
+  private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+  private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+  private TupleWriter currentTupleWriter() {
+    return tupleWriterStack.peek();
+  }
+
+  private ArrayWriter currentArrayWriter() {
+    return arrayWriterStack.peek();
+  }
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+  private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+  private RowSetLoader rowSetWriter;
+
+  public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+    this.rowSetWriter = writer;
+    this.tupleWriterStack.push(writer);
+  }
+
+  @Override
+  public void reset() {
+    tupleWriterStack.clear();
+    tupleWriterStack.push(rowSetWriter);
+    arrayWriterStack.clear();
+    this.isRootElement = true;
+    checkCleanState();
+  }
+
+  private void checkCleanState() {
+    assert(isOriginalRoot());
+    assert(arrayWriterStack.isEmpty());
+    assert(isRootElement);
+  }
+
+  @Override
+  public void startDocument() {
+    checkCleanState();
+  }
+
+  @Override
+  public void endDocument() {
+    checkCleanState();
+  }
+
+  private String colName(ElementMetadata md) {
+    return DrillDaffodilSchemaVisitor.makeColumnName(md);
+  }
+
+  @Override
+  public void startSimple(InfosetSimpleElement ise) {
+    assert (!isRootElement);
+    ElementMetadata md = ise.metadata();
+    String colName = colName(md);
+    ColumnWriter cw;
+    if (md.isArray()) {
+      // A simple type array
+      assert(!arrayWriterStack.isEmpty());
+      cw = currentArrayWriter().scalar();
+    } else {
+      // A simple element within a map
+      // Note the map itself might be an array
+      // but we don't care about that here.
+      cw = currentTupleWriter().column(colName);
+    }
+    ColumnMetadata cm = cw.schema();
+    assert(cm.isScalar());
+    if (md.isNillable() && ise.isNilled()) {
+      assert cm.isNullable();
+      cw.setNull();
+    } else {
+      convertDaffodilValueToDrillValue(ise, cm, cw);
+    }
+  }
+
+  @Override
+  public void endSimple(InfosetSimpleElement diSimple) {
+    assert (!isRootElement);
+    // do nothing
+  }
+
+  @Override
+  public void startComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    String colName = colName(ce.metadata());
+    if (isRootElement) {
+      assert(isOriginalRoot());
+      // This complex element's corresponds to the root element of the
+      // DFDL schema. We don't treat this as a column of the row set.
+      // Rather, it's children are the columns of the row set.
+      //
+      // If we do nothing at all here, then we'll start getting
+      // even calls for the children.
+      isRootElement = false;
+      return;
+    }
+    if (md.isArray()) {
+      assert(!arrayWriterStack.isEmpty());
+      // FIXME: is this the way to add a complex array child item (i.e., each 
array item is a map)
+      tupleWriterStack.push(currentArrayWriter().tuple());
+    } else {
+      tupleWriterStack.push(currentTupleWriter().tuple(colName));
+    }
+  }
+
+  @Override
+  public void endComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    if (isOriginalRoot()) {
+      isRootElement = true;
+      // do nothing else. The row gets closed-out in the 
DaffodilBatchReader.next() method.
+    } else {
+      // it's a map.
+      // We seem to not need to do anything to end the map. No action taken 
here works.
+      if (md.isArray()) {
+        assert (!arrayWriterStack.isEmpty());
+        currentArrayWriter().save(); // required for map array entries.
+      }
+      tupleWriterStack.pop();
+    }
+  }
+
+  @Override
+  public void startArray(InfosetArray diArray) {
+    ElementMetadata md = diArray.metadata();
+    assert (md.isArray());
+    // DFDL has no notion of an array directly within another array. A named 
field (map) is necessary
+    // before you can have another array.
+    assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a 
map, or the top level row.
+    String colName = colName(md);
+    TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+    ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+    arrayWriterStack.push(aw);
+  }
+
+  @Override
+  public void endArray(InfosetArray ia) {
+    ElementMetadata md = ia.metadata();
+    assert (md.isArray());
+    assert (!arrayWriterStack.empty());
+    // FIXME: How do we end/close-out an array?
+    // note that each array instance, when the instance is a map, must have
+    // save called after it is written to the array but that happens
+    // in endComplex events since it must be called not once per array, but
+    // once per array item.
+    arrayWriterStack.pop();
+  }
+
+  private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, 
ColumnMetadata cm, ColumnWriter cw) {
+    PrimitiveType dafType = ise.metadata().primitiveType();
+    TypeProtos.MinorType drillType = 
DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+    assert(drillType == cm.type());
+    switch (drillType) {
+    case INT: {
+      //
+      // FIXME: Javadoc for setObject says "primarily for testing"
+      // So how are we supposed to assign the column value then?
+      // Is there a way to get from a ColumnWriter to a typed scalar writer 
(downcast perhaps?)
+      cw.setObject(ise.getInt());

Review Comment:
   I did it this way because I could not find a way to get from these objects 
(cm, cw) to a ScalarWriter or something with a typed setter. I did look for 
that. 
   
   Can you look at this code and the call to it? Someplace I'm missing a step 
so that I have access to the objects with the typed set methods. 
    



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import 
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan 
scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // 
"schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, 
rootNamespace);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to get Daffodil DFDL processor for: 
%s", fsSchemaURI))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // Create the corresponding Drill schema.
+    // Note: this could be a very large schema. Think of a large complex RDBMS 
schema,
+    // all of it, hundreds of tables, but all part of the same metadata tree.
+    TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+    // Inform Drill about the schema
+    negotiator.tableSchema(drillSchema, true);
+
+    //
+    // DATA TIME: Next we construct the runtime objects, and open files.
+    //
+    // We get the DaffodilMessageParser, which is a stateful driver for 
daffodil that
+    // actually does the parsing.
+    rowSetLoader = negotiator.build().writer();
+
+    // We construct the Daffodil InfosetOutputter which the daffodil parser 
uses to
+    // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+    DaffodilDrillInfosetOutputter outputter = new 
DaffodilDrillInfosetOutputter(rowSetLoader);
+
+    // Now we can setup the dafParser with the outputter it will drive with
+    // the parser-produced infoset.
+    dafParser = new DaffodilMessageParser(dp); // needs further initialization 
after this.
+    dafParser.setInfosetOutputter(outputter);
+
+    Path dataPath = file.split().getPath();
+    // Lastly, we open the data stream
+    try {
+      dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to open input file: %s", 
dataPath.toString()))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // And lastly,... tell daffodil the input data stream.
+    dafParser.setInputStream(dataInputStream);
+  }
+
+
+  /**
+   * This is the core of actual processing - data movement from Daffodil to 
Drill.
+   * <p>
+   * If there is space in the batch, and there is data available to parse
+   * then this calls the daffodil parser, which parses data, delivering it to 
the rowWriter
+   * by way of the infoset outputter.
+   * <p>
+   * Repeats until the rowWriter is full (a batch is full), or there is no 
more data, or
+   * a parse error ends execution with a throw.
+   * <p>
+   * Validation errors and other warnings are not errors and are logged but do 
not cause
+   * parsing to fail/throw.
+   * @return true if there are rows retrieved, false if no rows were 
retrieved, which means
+   * no more will ever be retrieved (end of data).
+   * @throws RuntimeException on parse errors.
+   */
+  @Override
+  public boolean next() {
+    // Check assumed invariants
+    // We don't know if there is data or not. This could be called on an empty 
data file.
+    // We DO know that this won't be called if there is no space in the batch 
for even 1
+    // row.
+    if (dafParser.isEOF()) {
+      return false; // return without even checking for more rows or trying to 
parse.
+    }
+    while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip 
this loop.
+      // the predicate is always true once.
+      try {
+        dafParser.parse();
+        if (dafParser.isProcessingError()) {
+          assert(Objects.nonNull(dafParser.getDiagnostics()));
+          throw 
UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+              .addContext(errorContext).build(logger);
+        }
+        if (dafParser.isValidationError()) {
+          logger.warn(dafParser.getDiagnosticsAsString());
+          // Note that even if daffodil is set to not validate, validation 
errors may still occur
+          // from DFDL's "recoverableError" assertions.
+        }
+      } catch (Exception e) {
+        throw UserException.dataReadError(e).message("Error parsing file: " + 
e.getMessage())
+            .addContext(errorContext).build(logger);
+      }
+      rowSetLoader.save();
+    }
+    int nRows = rowSetLoader.rowCount();

Review Comment:
   We don't need it, but I like to do this.  This is really only to make the 
API invariant clear, which is that Drill depends on the nRows not being zero 
(or rather, is allowed to depend on that). This is my understanding of how the 
Drill API works.  I inferred this from other code that I was using as a model 
when writing this code. 
   
   I like to test and make explicit as many such assumptions as possible, and I 
prefer code/assert to just comments to that effect. I also like to leave such 
tests in the code. It's only in inner loops or where profiling shows it matters 
that I would normally convert this into comments.  Fields are the 'inner loop' 
of this code area, not rows, I would claim. 



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import 
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan 
scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // 
"schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, 
rootNamespace);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to get Daffodil DFDL processor for: 
%s", fsSchemaURI))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // Create the corresponding Drill schema.
+    // Note: this could be a very large schema. Think of a large complex RDBMS 
schema,
+    // all of it, hundreds of tables, but all part of the same metadata tree.
+    TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+    // Inform Drill about the schema
+    negotiator.tableSchema(drillSchema, true);
+
+    //
+    // DATA TIME: Next we construct the runtime objects, and open files.
+    //
+    // We get the DaffodilMessageParser, which is a stateful driver for 
daffodil that
+    // actually does the parsing.
+    rowSetLoader = negotiator.build().writer();
+
+    // We construct the Daffodil InfosetOutputter which the daffodil parser 
uses to
+    // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+    DaffodilDrillInfosetOutputter outputter = new 
DaffodilDrillInfosetOutputter(rowSetLoader);
+
+    // Now we can setup the dafParser with the outputter it will drive with
+    // the parser-produced infoset.
+    dafParser = new DaffodilMessageParser(dp); // needs further initialization 
after this.
+    dafParser.setInfosetOutputter(outputter);
+
+    Path dataPath = file.split().getPath();
+    // Lastly, we open the data stream
+    try {
+      dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to open input file: %s", 
dataPath.toString()))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // And lastly,... tell daffodil the input data stream.
+    dafParser.setInputStream(dataInputStream);
+  }
+
+
+  /**
+   * This is the core of actual processing - data movement from Daffodil to 
Drill.
+   * <p>
+   * If there is space in the batch, and there is data available to parse
+   * then this calls the daffodil parser, which parses data, delivering it to 
the rowWriter
+   * by way of the infoset outputter.
+   * <p>
+   * Repeats until the rowWriter is full (a batch is full), or there is no 
more data, or
+   * a parse error ends execution with a throw.
+   * <p>
+   * Validation errors and other warnings are not errors and are logged but do 
not cause
+   * parsing to fail/throw.
+   * @return true if there are rows retrieved, false if no rows were 
retrieved, which means
+   * no more will ever be retrieved (end of data).
+   * @throws RuntimeException on parse errors.
+   */
+  @Override
+  public boolean next() {
+    // Check assumed invariants
+    // We don't know if there is data or not. This could be called on an empty 
data file.
+    // We DO know that this won't be called if there is no space in the batch 
for even 1
+    // row.
+    if (dafParser.isEOF()) {
+      return false; // return without even checking for more rows or trying to 
parse.
+    }
+    while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip 
this loop.
+      // the predicate is always true once.
+      try {
+        dafParser.parse();
+        if (dafParser.isProcessingError()) {
+          assert(Objects.nonNull(dafParser.getDiagnostics()));
+          throw 
UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+              .addContext(errorContext).build(logger);
+        }
+        if (dafParser.isValidationError()) {
+          logger.warn(dafParser.getDiagnosticsAsString());
+          // Note that even if daffodil is set to not validate, validation 
errors may still occur
+          // from DFDL's "recoverableError" assertions.
+        }
+      } catch (Exception e) {
+        throw UserException.dataReadError(e).message("Error parsing file: " + 
e.getMessage())
+            .addContext(errorContext).build(logger);
+      }
+      rowSetLoader.save();
+    }
+    int nRows = rowSetLoader.rowCount();
+    assert nRows > 0; // This cannot be zero. If the parse failed we will have 
already thrown out of here.
+    return true;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(dataInputStream);

Review Comment:
   This is the only close required by Daffodil. 



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;
+    if (result)
+      assert(tupleWriterStack.size() == 1);
+    return result;
+  }
+
+  /**
+   * True if the next startComplex call will be for the
+   * DFDL infoset root element whose children are the columns of
+   * the row set.
+   */
+  private boolean isRootElement = true;
+
+  /**
+   * Stack that is used only if we have sub-structures that are not
+   * simple-type fields of the row.
+   */
+  private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+  private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+  private TupleWriter currentTupleWriter() {
+    return tupleWriterStack.peek();
+  }
+
+  private ArrayWriter currentArrayWriter() {
+    return arrayWriterStack.peek();
+  }
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+  private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+  private RowSetLoader rowSetWriter;
+
+  public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+    this.rowSetWriter = writer;
+    this.tupleWriterStack.push(writer);
+  }
+
+  @Override
+  public void reset() {
+    tupleWriterStack.clear();
+    tupleWriterStack.push(rowSetWriter);
+    arrayWriterStack.clear();
+    this.isRootElement = true;
+    checkCleanState();
+  }
+
+  private void checkCleanState() {
+    assert(isOriginalRoot());
+    assert(arrayWriterStack.isEmpty());
+    assert(isRootElement);
+  }
+
+  @Override
+  public void startDocument() {
+    checkCleanState();
+  }
+
+  @Override
+  public void endDocument() {
+    checkCleanState();
+  }
+
+  private String colName(ElementMetadata md) {
+    return DrillDaffodilSchemaVisitor.makeColumnName(md);
+  }
+
+  @Override
+  public void startSimple(InfosetSimpleElement ise) {
+    assert (!isRootElement);
+    ElementMetadata md = ise.metadata();
+    String colName = colName(md);
+    ColumnWriter cw;
+    if (md.isArray()) {
+      // A simple type array
+      assert(!arrayWriterStack.isEmpty());
+      cw = currentArrayWriter().scalar();
+    } else {
+      // A simple element within a map
+      // Note the map itself might be an array
+      // but we don't care about that here.
+      cw = currentTupleWriter().column(colName);
+    }
+    ColumnMetadata cm = cw.schema();
+    assert(cm.isScalar());
+    if (md.isNillable() && ise.isNilled()) {
+      assert cm.isNullable();
+      cw.setNull();
+    } else {
+      convertDaffodilValueToDrillValue(ise, cm, cw);
+    }
+  }
+
+  @Override
+  public void endSimple(InfosetSimpleElement diSimple) {
+    assert (!isRootElement);
+    // do nothing
+  }
+
+  @Override
+  public void startComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    String colName = colName(ce.metadata());
+    if (isRootElement) {
+      assert(isOriginalRoot());
+      // This complex element's corresponds to the root element of the
+      // DFDL schema. We don't treat this as a column of the row set.
+      // Rather, it's children are the columns of the row set.
+      //
+      // If we do nothing at all here, then we'll start getting
+      // even calls for the children.
+      isRootElement = false;
+      return;
+    }
+    if (md.isArray()) {
+      assert(!arrayWriterStack.isEmpty());
+      // FIXME: is this the way to add a complex array child item (i.e., each 
array item is a map)
+      tupleWriterStack.push(currentArrayWriter().tuple());
+    } else {
+      tupleWriterStack.push(currentTupleWriter().tuple(colName));
+    }
+  }
+
+  @Override
+  public void endComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    if (isOriginalRoot()) {
+      isRootElement = true;
+      // do nothing else. The row gets closed-out in the 
DaffodilBatchReader.next() method.
+    } else {
+      // it's a map.
+      // We seem to not need to do anything to end the map. No action taken 
here works.
+      if (md.isArray()) {
+        assert (!arrayWriterStack.isEmpty());
+        currentArrayWriter().save(); // required for map array entries.
+      }
+      tupleWriterStack.pop();
+    }
+  }
+
+  @Override
+  public void startArray(InfosetArray diArray) {
+    ElementMetadata md = diArray.metadata();
+    assert (md.isArray());
+    // DFDL has no notion of an array directly within another array. A named 
field (map) is necessary
+    // before you can have another array.
+    assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a 
map, or the top level row.
+    String colName = colName(md);
+    TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+    ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+    arrayWriterStack.push(aw);
+  }
+
+  @Override
+  public void endArray(InfosetArray ia) {
+    ElementMetadata md = ia.metadata();
+    assert (md.isArray());
+    assert (!arrayWriterStack.empty());
+    // FIXME: How do we end/close-out an array?
+    // note that each array instance, when the instance is a map, must have
+    // save called after it is written to the array but that happens
+    // in endComplex events since it must be called not once per array, but
+    // once per array item.
+    arrayWriterStack.pop();
+  }
+
+  private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, 
ColumnMetadata cm, ColumnWriter cw) {
+    PrimitiveType dafType = ise.metadata().primitiveType();
+    TypeProtos.MinorType drillType = 
DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+    assert(drillType == cm.type());
+    switch (drillType) {
+    case INT: {
+      //
+      // FIXME: Javadoc for setObject says "primarily for testing"
+      // So how are we supposed to assign the column value then?
+      // Is there a way to get from a ColumnWriter to a typed scalar writer 
(downcast perhaps?)
+      cw.setObject(ise.getInt());
+      break;
+    }
+    case BIGINT: {
+      cw.setObject(ise.getLong());
+      break;
+    }
+    case SMALLINT: {
+      cw.setObject(ise.getShort());
+      break;
+    }
+    case TINYINT: {
+      cw.setObject(ise.getByte());
+      break;
+    }
+//        .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)
+//        .put("UNSIGNEDINT", TypeProtos.MinorType.UINT4)
+//        .put("UNSIGNEDSHORT", TypeProtos.MinorType.UINT2)
+//        .put("UNSIGNEDBYTE", TypeProtos.MinorType.UINT1)
+//        .put("INTEGER", TypeProtos.MinorType.BIGINT)
+//        .put("NONNEGATIVEINTEGER", TypeProtos.MinorType.BIGINT)
+    case BIT: {
+      cw.setObject(ise.getBoolean());
+      break;
+    }
+//        .put("DATE", TypeProtos.MinorType.DATE) // requires conversion
+//        .put("DATETIME", TypeProtos.MinorType.TIMESTAMP) // requires 
conversion
+//        .put("DECIMAL", TypeProtos.MinorType.VARDECIMAL) // requires 
conversion (maybe)
+    case FLOAT8: {
+      cw.setObject(ise.getDouble());
+      break;
+    }
+    case FLOAT4: {
+      cw.setObject(ise.getFloat());
+      break;
+    }
+    case VARBINARY: {
+      cw.setObject(ise.getHexBinary());
+      break;
+    }
+    case VARCHAR: {
+      //
+      // FIXME: VARCHAR is defined in drill as utf8 string.
+      // Is Drill expecting something other than a Java string in this 
setObject call?
+      // Should we be mapping Daffodil strings to Drill VAR16CHAR type?
+      //
+      String s = ise.getString();
+      cw.setObject(s);
+      break;
+    }
+//        .put("TIME", TypeProtos.MinorType.TIME) // requires conversion

Review Comment:
   Yes Daffodil has date, time, and dateTime types.  At the API level we have 
essentially Java's ordinary calendar-related classes in hand at this point, so 
it should be straightforward to consume those when creating Drill's equivalent 
types. 
   
   However, I stubbed all this out until resolving the typed setter issue.



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;

Review Comment:
   Is the Drill coding style defined in a wiki or other doc page somewhere? I 
didn't find one. 
   
   If this is just java-standard, then I need reminding, as I have not coded 
Java prior to this effort for 12+ years now. 





> IndexOutOfBoundsException in partition sender when doing streaming aggregate 
> with LIMIT 
> ----------------------------------------------------------------------------------------
>
>                 Key: DRILL-2835
>                 URL: https://issues.apache.org/jira/browse/DRILL-2835
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Execution - RPC
>    Affects Versions: 0.8.0
>            Reporter: Aman Sinha
>            Assignee: Venki Korukanti
>            Priority: Major
>             Fix For: 0.9.0
>
>         Attachments: DRILL-2835-1.patch, DRILL-2835-2.patch
>
>
> Following CTAS run on a TPC-DS 100GB scale factor on a 10-node cluster: 
> {code}
> alter session set `planner.enable_hashagg` = false;
> alter session set `planner.enable_multiphase_agg` = true;
> create table dfs.tmp.stream9 as 
> select cr_call_center_sk , cr_catalog_page_sk ,  cr_item_sk , cr_reason_sk , 
> cr_refunded_addr_sk , count(*) from catalog_returns_dri100 
>  group by cr_call_center_sk , cr_catalog_page_sk ,  cr_item_sk , cr_reason_sk 
> , cr_refunded_addr_sk
>  limit 100
> ;
> {code}
> {code}
> Caused by: java.lang.IndexOutOfBoundsException: index: 1023, length: 1 
> (expected: range(0, 0))
>         at io.netty.buffer.DrillBuf.checkIndexD(DrillBuf.java:200) 
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:4.0.24.Final]
>         at io.netty.buffer.DrillBuf.chk(DrillBuf.java:222) 
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:4.0.24.Final]
>         at io.netty.buffer.DrillBuf.setByte(DrillBuf.java:621) 
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:4.0.24.Final]
>         at 
> org.apache.drill.exec.vector.UInt1Vector$Mutator.set(UInt1Vector.java:342) 
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:0.9.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.vector.NullableBigIntVector$Mutator.set(NullableBigIntVector.java:372)
>  ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:0.9.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.vector.NullableBigIntVector.copyFrom(NullableBigIntVector.java:284)
>  ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:0.9.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.test.generated.PartitionerGen4$OutgoingRecordBatch.doEval(PartitionerTemplate.java:370)
>  ~[na:na]
>         at 
> org.apache.drill.exec.test.generated.PartitionerGen4$OutgoingRecordBatch.copy(PartitionerTemplate.java:249)
>  ~[na:na]
>         at 
> org.apache.drill.exec.test.generated.PartitionerGen4.doCopy(PartitionerTemplate.java:208)
>  ~[na:na]
>         at 
> org.apache.drill.exec.test.generated.PartitionerGen4.partitionBatch(PartitionerTemplate.java:176)
>  ~[na:na]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to