cgivre commented on code in PR #2836:
URL: https://github.com/apache/drill/pull/2836#discussion_r1364795241


##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import 
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig formatConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final boolean validationMode;
+
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan 
scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.formatConfig = readerConfig.plugin.getConfig();
+
+    this.validationMode = formatConfig.getValidationMode();
+
+    //
+    // FIXME: Next, a MIRACLE occurs.
+    //
+    // We get the dfdlSchemaURI filled in from the query, or a default config 
location
+    // We get the rootName (or null if not supplied) from the query, or a 
default config location
+    // We get the rootNamespace (or null if not supplied) from the query, or a 
default config location
+    // We get the validationMode (true/false) filled in from the query or a 
default config location
+    // We get the dataInputURI filled in from the query, or from a default 
config location
+    //
+    // For a first cut, let's just fake it. :-)
+
+    String rootName = null;
+    String rootNamespace = null;
+
+    URI dfdlSchemaURI;
+    URI dataInputURI;
+
+    try {

Review Comment:
   A few things... 
   1.  I added config variables to the config for the `rootName` and 
`rootNamespace`.  This means that you can set default values in the config or 
overwrite them in the query. 
   2. What it looks like to me is that we should do the same for the schema URI 
as well.  
   
   I think the object you're looking for here to access the file system would 
be the `negotiator.file().fileSystem()` object.  With that object you can 
access the file system directly either via `Path` or `URI`.   Take a peek at 
some of the methods available to you there.  
   
   As an example, in the SHP file reader, we do something similar:
   
   
https://github.com/apache/drill/blob/2ab46a9411a52f12a0f9acb1144a318059439bc4/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java#L77-L83
   
   
   



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import 
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig formatConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final boolean validationMode;
+
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan 
scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.formatConfig = readerConfig.plugin.getConfig();
+
+    this.validationMode = formatConfig.getValidationMode();
+
+    //
+    // FIXME: Next, a MIRACLE occurs.
+    //
+    // We get the dfdlSchemaURI filled in from the query, or a default config 
location
+    // We get the rootName (or null if not supplied) from the query, or a 
default config location
+    // We get the rootNamespace (or null if not supplied) from the query, or a 
default config location
+    // We get the validationMode (true/false) filled in from the query or a 
default config location
+    // We get the dataInputURI filled in from the query, or from a default 
config location
+    //
+    // For a first cut, let's just fake it. :-)
+
+    String rootName = null;
+    String rootNamespace = null;
+
+    URI dfdlSchemaURI;
+    URI dataInputURI;
+
+    try {
+      dfdlSchemaURI = new URI("schema/complexArray1.dfdl.xsd");
+      dataInputURI = new URI("data/complexArray1.dat");
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .message("Error retrieving DFDL schema files")
+          .build(logger);
+    }
+
+
+    // given dfdlSchemaURI and validation settings, and rootName, 
rootNamespace optionally
+    // get the Daffodil DataProcessor (aka parser static information) that we 
need, and from that
+    // we get the DaffodilMesageParser, which is a stateful driver for 
daffodil that actually does
+    // parsing.
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(dfdlSchemaURI, true, rootName, rootNamespace);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to get Daffodil DFDL processor for: 
%s", dfdlSchemaURI.toString()))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // Create the corresponding Drill schema
+    TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+    // Inform Drill about the schema
+    negotiator.tableSchema(drillSchema, true);
+    //
+    // FIXME: Now a MIRACLE occurs. We get the drill row writer (actually a 
rowSetLoader)??
+    //
+    rowSetLoader = negotiator.build().writer(); // FIXME: is this right?
+
+    // We construct the Daffodil InfosetOutputter which the daffodil parser 
uses to
+    // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+    DaffodilDrillInfosetOutputter outputter = new 
DaffodilDrillInfosetOutputter(rowSetLoader);
+    // Now we can setup the dafParser with the outputter it will drive with 
the parser-produced
+    // infoset.
+    dafParser = new DaffodilMessageParser(dp); // needs further initialization 
after this.
+    dafParser.setInfosetOutputter(outputter);
+    // Lastly, we open the data stream
+    try {
+      dataInputStream = dataInputURI.toURL().openStream();

Review Comment:
   Ok, I'm not sure why we need to do this.  Drill can get you an input stream 
of the input file. 
   All you need to do is:
   
   ```java
   dataInputStream = 
negotiator.file().fileSystem().openPossiblyCompressedStream(negotiator.file().split().getPath());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to