cgivre commented on code in PR #2836: URL: https://github.com/apache/drill/pull/2836#discussion_r1364795241
########## contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.daffodil.japi.DataProcessor; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory; +import org.apache.drill.exec.store.dfs.easy.EasySubScan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema; + + +public class DaffodilBatchReader implements ManagedReader { + + private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class); + private final DaffodilFormatConfig formatConfig; + private final RowSetLoader rowSetLoader; + private final CustomErrorContext errorContext; + private final DaffodilMessageParser dafParser; + private final boolean validationMode; + + private final InputStream dataInputStream; + + static class DaffodilReaderConfig { + final DaffodilFormatPlugin plugin; + DaffodilReaderConfig(DaffodilFormatPlugin plugin) { + this.plugin = plugin; + } + } + + public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) { + + errorContext = negotiator.parentErrorContext(); + this.formatConfig = readerConfig.plugin.getConfig(); + + this.validationMode = formatConfig.getValidationMode(); + + // + // FIXME: Next, a MIRACLE occurs. + // + // We get the dfdlSchemaURI filled in from the query, or a default config location + // We get the rootName (or null if not supplied) from the query, or a default config location + // We get the rootNamespace (or null if not supplied) from the query, or a default config location + // We get the validationMode (true/false) filled in from the query or a default config location + // We get the dataInputURI filled in from the query, or from a default config location + // + // For a first cut, let's just fake it. :-) + + String rootName = null; + String rootNamespace = null; + + URI dfdlSchemaURI; + URI dataInputURI; + + try { Review Comment: A few things... 1. I added config variables to the config for the `rootName` and `rootNamespace`. This means that you can set default values in the config or overwrite them in the query. 2. What it looks like to me is that we should do the same for the schema URI as well. I think the object you're looking for here to access the file system would be the `negotiator.file().fileSystem()` object. With that object you can access the file system directly either via `Path` or `URI`. Take a peek at some of the methods available to you there. As an example, in the SHP file reader, we do something similar: https://github.com/apache/drill/blob/2ab46a9411a52f12a0f9acb1144a318059439bc4/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java#L77-L83 ########## contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.daffodil; + +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.daffodil.japi.DataProcessor; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory; +import org.apache.drill.exec.store.dfs.easy.EasySubScan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema; + + +public class DaffodilBatchReader implements ManagedReader { + + private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class); + private final DaffodilFormatConfig formatConfig; + private final RowSetLoader rowSetLoader; + private final CustomErrorContext errorContext; + private final DaffodilMessageParser dafParser; + private final boolean validationMode; + + private final InputStream dataInputStream; + + static class DaffodilReaderConfig { + final DaffodilFormatPlugin plugin; + DaffodilReaderConfig(DaffodilFormatPlugin plugin) { + this.plugin = plugin; + } + } + + public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) { + + errorContext = negotiator.parentErrorContext(); + this.formatConfig = readerConfig.plugin.getConfig(); + + this.validationMode = formatConfig.getValidationMode(); + + // + // FIXME: Next, a MIRACLE occurs. + // + // We get the dfdlSchemaURI filled in from the query, or a default config location + // We get the rootName (or null if not supplied) from the query, or a default config location + // We get the rootNamespace (or null if not supplied) from the query, or a default config location + // We get the validationMode (true/false) filled in from the query or a default config location + // We get the dataInputURI filled in from the query, or from a default config location + // + // For a first cut, let's just fake it. :-) + + String rootName = null; + String rootNamespace = null; + + URI dfdlSchemaURI; + URI dataInputURI; + + try { + dfdlSchemaURI = new URI("schema/complexArray1.dfdl.xsd"); + dataInputURI = new URI("data/complexArray1.dat"); + } catch (URISyntaxException e) { + throw UserException.validationError(e) + .message("Error retrieving DFDL schema files") + .build(logger); + } + + + // given dfdlSchemaURI and validation settings, and rootName, rootNamespace optionally + // get the Daffodil DataProcessor (aka parser static information) that we need, and from that + // we get the DaffodilMesageParser, which is a stateful driver for daffodil that actually does + // parsing. + DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory(); + DataProcessor dp; + try { + dp = dpf.getDataProcessor(dfdlSchemaURI, true, rootName, rootNamespace); + } catch (Exception e) { + throw UserException.dataReadError(e) + .message(String.format("Failed to get Daffodil DFDL processor for: %s", dfdlSchemaURI.toString())) + .addContext(errorContext).addContext(e.getMessage()).build(logger); + } + // Create the corresponding Drill schema + TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp); + // Inform Drill about the schema + negotiator.tableSchema(drillSchema, true); + // + // FIXME: Now a MIRACLE occurs. We get the drill row writer (actually a rowSetLoader)?? + // + rowSetLoader = negotiator.build().writer(); // FIXME: is this right? + + // We construct the Daffodil InfosetOutputter which the daffodil parser uses to + // convert infoset event calls to fill in a Drill row via a rowSetLoader. + DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader); + // Now we can setup the dafParser with the outputter it will drive with the parser-produced + // infoset. + dafParser = new DaffodilMessageParser(dp); // needs further initialization after this. + dafParser.setInfosetOutputter(outputter); + // Lastly, we open the data stream + try { + dataInputStream = dataInputURI.toURL().openStream(); Review Comment: Ok, I'm not sure why we need to do this. Drill can get you an input stream of the input file. All you need to do is: ```java dataInputStream = negotiator.file().fileSystem().openPossiblyCompressedStream(negotiator.file().split().getPath()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org