Github user jinfengni commented on a diff in the pull request: https://github.com/apache/drill/pull/906#discussion_r135977556 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java --- @@ -39,88 +35,107 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.UnionAll; -import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.resolver.TypeCastRules; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Stack; -public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { +public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class); - private List<MaterializedField> outputFields; + private SchemaChangeCallBack callBack = new SchemaChangeCallBack(); private UnionAller unionall; - private UnionAllInput unionAllInput; - private RecordBatch current; - private final List<TransferPair> transfers = Lists.newArrayList(); - private List<ValueVector> allocationVectors; - protected SchemaChangeCallBack callBack = new SchemaChangeCallBack(); + private List<ValueVector> allocationVectors = Lists.newArrayList(); private int recordCount = 0; - private boolean schemaAvailable = false; + private UnionInputIterator unionInputIterator; public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException { - super(config, context, false); - assert (children.size() == 2) : "The number of the operands of Union must be 2"; - unionAllInput = new UnionAllInput(this, children.get(0), children.get(1)); - } - - @Override - public int getRecordCount() { - return recordCount; + super(config, context, true, children.get(0), children.get(1)); } @Override protected void killIncoming(boolean sendUpstream) { - unionAllInput.getLeftRecordBatch().kill(sendUpstream); - unionAllInput.getRightRecordBatch().kill(sendUpstream); + left.kill(sendUpstream); + right.kill(sendUpstream); } - @Override - public SelectionVector2 getSelectionVector2() { - throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector"); - } + protected void buildSchema() throws SchemaChangeException { + if (! prefetchFirstBatchFromBothSides()) { + return; + } - @Override - public SelectionVector4 getSelectionVector4() { - throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector"); + unionInputIterator = new UnionInputIterator(leftUpstream, left, rightUpstream, right); + + if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.OK_NEW_SCHEMA) { + inferOutputFieldsOneSide(right.getSchema()); + } else if (rightUpstream == IterOutcome.NONE && leftUpstream == IterOutcome.OK_NEW_SCHEMA) { + inferOutputFieldsOneSide((left.getSchema())); + } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream == IterOutcome.OK_NEW_SCHEMA) { + inferOutputFieldsBothSide(left.getSchema(), right.getSchema()); + } + + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + + for (VectorWrapper vv: container) { + vv.getValueVector().allocateNew(); + vv.getValueVector().getMutator().setValueCount(0); + } --- End diff -- Why would you think the above the four lines of code are **_"copied"_** from somewhere else, while the two lines of code are NOT "copied"? You could say the two lines are shorter, but I could not see why the four lines are copied, while the two lines of code are NOT? With that said, thanks for pointing to this new util method. I was not aware of that until now. In the revised patch, I used the util method to reduce 4 lines code into 2 lines code.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---