I have had some success on Drill + Daffodil integration, but I do need assistance now.
I am at the point where I am clearly not using the API correctly to populate arrays, and not releasing resources that must be managed. If you get my fork+branch of Daffodil from here https://github.com/mbeckerle/daffodil/releases/tag/drill-exp-2023-11-07 (drill-exp2 branch) You can unzip the binary zip file onto the ~/.m2/repository/org/apache/daffodil directory and this will enable my fork+branch of Drill to compile and run. My Drill fork and branch (daffodil-2835) are here: https://github.com/mbeckerle/drill/tree/daffodil-2835 A PR with the current state of the code is here: https://github.com/apache/drill/pull/2836 The tests that are failing are named testComplexQuery1 and testComplexArrayQuery1. They are in contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java The "action" as it were is mostly in the files: (1) contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java which accepts Daffodil SAX-style parse output events, and populates Drill rows, columns, and arrays. This is a stateful event handler that maintains a stack of the current Drill TupleWriter/ArrayWriter. A breakpoint in each of its handler methods easily shows what is happening on these small tests. and (2) contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java which implements the setup of the batch reader and the next() routine which invokes the Daffodil parse() call (which then invokes the above DaffodilDrillInfosetOutputter. This DaffodilBatchReader.java file is likely responsible for not setting up resources to be released properly. The testComplexQuery1 is a DFDL schema for a complex type containing two integers. It works, in that the Drill query result contains the two values. If this was creating JSON the output data would look like { "ex_r":{ "a1":"257", "a2":"258"} } However after returning that result and after the junit test itself indicates passing I get the output of errors in the attached drill-testComplexQuery1-output.txt, which seems to be related to not releasing resources properly. The testComplexArrayQuery1 is a similar schema, but it has an array, named 'record' which is repeating/array and which contains two integers in each array element. The test data has 3 such records. If this was creating JSON, the output data would look like { "ex_r":{ "record":[ {"a1":"257", "a2":"258"}, {"a1":"259", "a2":"260"}, {"a1":"261", "a2":"262"} ] } } This works in that in the debugger I can see it walk all 3 test records, and parse all the data, and it makes calls to populate data in Drill, but it seems I am missing something important about how the arrays are properly created, populated, and closed, as it gives an array-related error message, and never gets to where a Drill result is returned. See attached drill-testComplexArrayQuery1-output.txt. Any help is greatly appreciated. Mike Beckerle Apache Daffodil PMC | daffodil.apache.org OGF DFDL Workgroup Co-Chair | www.ogf.org/ogf/doku.php/standards/dfdl/dfdl Owl Cyber Defense | www.owlcyberdefense.com
#: `ex_r` STRUCT<`a1` INT NOT NULL, `a2` INT NOT NULL> 0: {257, 258} java.lang.RuntimeException: Exception while closing at org.apache.drill.common.DrillAutoCloseables.closeNoChecked(DrillAutoCloseables.java:46) at org.apache.drill.exec.client.DrillClient.close(DrillClient.java:481) at org.apache.drill.test.ClientFixture.close(ClientFixture.java:259) at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:91) at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:71) at org.apache.drill.test.ClusterTest.shutdown(ClusterTest.java:88) Caused by: java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding buffers allocated (1). Allocator(ROOT) 0/128/4352/2684354560 (res/actual/peak/limit) child allocators: 0 ledgers: 1 ledger[68] allocator: ROOT), isOwning: true, size: 128, references: 2, life: 315432066733998..0, allocatorManager: [62, life: 315432066691268..0] holds 8 buffers. DrillBuf[94], udle: [63 100..104] DrillBuf[82], udle: [63 0..128] DrillBuf[83], udle: [63 10..94] DrillBuf[92], udle: [63 96..100] DrillBuf[93], udle: [63 100..104] DrillBuf[84], udle: [63 96..104] DrillBuf[91], udle: [63 96..100] DrillBuf[90], udle: [63 96..104] reservations: 0 at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocator.java:502) at org.apache.drill.common.DrillAutoCloseables.closeNoChecked(DrillAutoCloseables.java:44) ... 5 more
Found one or more vector errors from OperatorRecordBatch record - RepeatedMapVector: Row count = 1, but value count = 0 org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR: IllegalStateException: Batch validation failed. Source operator: OperatorRecordBatch Fragment: 0:0 Please, refer to logs for more information. [Error Id: 058bffb0-6992-4c58-b453-b70f3abc733c on vicuna:31010] at org.apache.drill.exec.rpc.user.QueryResultHandler.resultArrived(QueryResultHandler.java:125) at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422) at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96) at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:271) at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:241) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IllegalStateException: Batch validation failed. Source operator: OperatorRecordBatch at org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator.validateBatch(IteratorValidatorBatchIterator.java:341) at org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator.next(IteratorValidatorBatchIterator.java:249) at org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:103) at org.apache.drill.exec.physical.impl.S