This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 79ea7b1f13c4864bfc9a25049b25b3369fce07cc Author: Paul Rogers <[email protected]> AuthorDate: Tue Dec 10 20:15:32 2019 -0800 DRILL-7476: Set lastSet on TransferPair copies Variable-width nullable vectors maintain a "lastSet" field in the mutator. This field is used in "fill empties" logic when setting the vector's value count. This is true even if the vector is read-only, or has been transferred from another (read-only) vector. LastSet must be set to the row count or the code will helpfully overwrite existing offsets with 0. closes #1922 --- .../org/apache/drill/exec/client/DrillClient.java | 6 +- .../exec/physical/impl/limit/LimitRecordBatch.java | 5 +- .../unorderedreceiver/UnorderedReceiverBatch.java | 2 +- .../physical/impl/validate/BatchValidator.java | 43 +++++++++- .../drill/exec/record/FragmentWritableBatch.java | 80 ++++++++++-------- .../apache/drill/exec/record/WritableBatch.java | 7 +- .../drill/exec/work/fragment/FragmentExecutor.java | 95 ++++++++++++++-------- .../java/org/apache/drill/test/QueryBuilder.java | 20 ++--- .../codegen/templates/NullableValueVectors.java | 9 ++ 9 files changed, 181 insertions(+), 86 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 7dc4d59..237aba1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -112,8 +112,8 @@ public class DrillClient implements Closeable, ConnectionThrottle { private volatile ClusterCoordinator clusterCoordinator; private volatile boolean connected = false; private final BufferAllocator allocator; - private int reconnectTimes; - private int reconnectDelay; + private final int reconnectTimes; + private final int reconnectDelay; private boolean supportComplexTypes; private final boolean ownsZkConnection; private final boolean ownsAllocator; @@ -862,7 +862,7 @@ public class DrillClient implements Closeable, ConnectionThrottle { @Override public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { - logger.debug("Result arrived: Result: {}", result ); + logger.debug("Result arrived: Result: {}", result); results.add(result); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index e9d7dd3..25c79ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -41,7 +41,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { private static final Logger logger = LoggerFactory.getLogger(LimitRecordBatch.class); - private SelectionVector2 outgoingSv; + private final SelectionVector2 outgoingSv; private SelectionVector2 incomingSv; // Start offset of the records @@ -234,7 +234,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { outgoingSv.setRecordCount(svIndex); outgoingSv.setBatchActualRecordCount(inputRecordCount); // Actual number of values in the container; not the number in - // the SV. + // the SV. Set record count, not value count. Value count is + // carried over from input vectors. container.setRecordCount(inputRecordCount); // Update the start offset recordStartOffset = 0; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index d40bd6d..f5fb77a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -206,7 +206,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount()); batch.release(); - if(schemaChanged) { + if (schemaChanged) { this.schema = batchLoader.getSchema(); stats.batchReceived(0, rbd.getRecordCount(), true); lastOutcome = IterOutcome.OK_NEW_SCHEMA; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java index 36d9c8f..2c657e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java @@ -25,6 +25,10 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.BitVector; import org.apache.drill.exec.vector.FixedWidthVector; +import org.apache.drill.exec.vector.NullableVar16CharVector; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.NullableVarDecimalVector; import org.apache.drill.exec.vector.NullableVector; import org.apache.drill.exec.vector.RepeatedBitVector; import org.apache.drill.exec.vector.UInt1Vector; @@ -321,10 +325,38 @@ public class BatchValidator { "Outer value count = %d, but inner value count = %d", outerCount, valueCount)); } + int lastSet = getLastSet(vector); + if (lastSet != -2) { + if (lastSet != valueCount - 1) { + error(name, vector, String.format( + "Value count = %d, but last set = %d", + valueCount, lastSet)); + } + } verifyIsSetVector(vector, (UInt1Vector) vector.getBitsVector()); validateVector(name + "-values", valuesVector); } + // getLastSet() is visible per vector type, not on a super class. + // There is no common nullable, variable width super class. + + private int getLastSet(NullableVector vector) { + if (vector instanceof NullableVarCharVector) { + return ((NullableVarCharVector) vector).getMutator().getLastSet(); + } + if (vector instanceof NullableVarBinaryVector) { + return ((NullableVarBinaryVector) vector).getMutator().getLastSet(); + } + if (vector instanceof NullableVarDecimalVector) { + return ((NullableVarDecimalVector) vector).getMutator().getLastSet(); + } + if (vector instanceof NullableVar16CharVector) { + return ((NullableVar16CharVector) vector).getMutator().getLastSet(); + } + // Otherwise, return a value that is never legal for lastSet + return -2; + } + private void validateVarCharVector(String name, VarCharVector vector) { int dataLength = vector.getBuffer().writerIndex(); validateVarWidthVector(name, vector, dataLength); @@ -332,7 +364,12 @@ public class BatchValidator { private void validateVarBinaryVector(String name, VarBinaryVector vector) { int dataLength = vector.getBuffer().writerIndex(); - validateVarWidthVector(name, vector, dataLength); + int lastOffset = validateVarWidthVector(name, vector, dataLength); + if (lastOffset != dataLength) { + error(name, vector, String.format( + "Data vector has length %d, but offset vector has largest offset %d", + dataLength, lastOffset)); + } } private void validateVarDecimalVector(String name, VarDecimalVector vector) { @@ -340,9 +377,9 @@ public class BatchValidator { validateVarWidthVector(name, vector, dataLength); } - private void validateVarWidthVector(String name, VariableWidthVector vector, int dataLength) { + private int validateVarWidthVector(String name, VariableWidthVector vector, int dataLength) { int valueCount = vector.getAccessor().getValueCount(); - validateOffsetVector(name + "-offsets", vector.getOffsetVector(), + return validateOffsetVector(name + "-offsets", vector.getOffsetVector(), valueCount, dataLength); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java index 5606d75..bbb988a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java @@ -23,25 +23,36 @@ import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; -public class FragmentWritableBatch{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class); +public class FragmentWritableBatch { private static RecordBatchDef EMPTY_DEF = RecordBatchDef.newBuilder().setRecordCount(0).build(); private final ByteBuf[] buffers; private final FragmentRecordBatch header; - public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final WritableBatch batch){ - this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId}, batch.getDef(), batch.getBuffers()); + public FragmentWritableBatch(boolean isLast, QueryId queryId, + int sendMajorFragmentId, int sendMinorFragmentId, + int receiveMajorFragmentId, int receiveMinorFragmentId, + WritableBatch batch) { + this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, + new int[]{receiveMinorFragmentId}, batch.getDef(), batch.getBuffers()); } - public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds, final WritableBatch batch){ - this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, batch.getDef(), batch.getBuffers()); + public FragmentWritableBatch(boolean isLast, QueryId queryId, + int sendMajorFragmentId, int sendMinorFragmentId, + int receiveMajorFragmentId, int[] receiveMinorFragmentIds, + WritableBatch batch) { + this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, + receiveMajorFragmentId, receiveMinorFragmentIds, batch.getDef(), + batch.getBuffers()); } - private FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentId, final RecordBatchDef def, final ByteBuf... buffers){ + private FragmentWritableBatch(boolean isLast, QueryId queryId, + int sendMajorFragmentId, int sendMinorFragmentId, + int receiveMajorFragmentId, int[] receiveMinorFragmentId, + RecordBatchDef def, ByteBuf... buffers) { this.buffers = buffers; - final FragmentRecordBatch.Builder builder = FragmentRecordBatch.newBuilder() + FragmentRecordBatch.Builder builder = FragmentRecordBatch.newBuilder() .setIsLastBatch(isLast) .setDef(def) .setQueryId(queryId) @@ -49,49 +60,60 @@ public class FragmentWritableBatch{ .setSendingMajorFragmentId(sendMajorFragmentId) .setSendingMinorFragmentId(sendMinorFragmentId); - for(final int i : receiveMinorFragmentId){ - builder.addReceivingMinorFragmentId(i); + for (int fragmentId : receiveMinorFragmentId) { + builder.addReceivingMinorFragmentId(fragmentId); } this.header = builder.build(); } - - public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId){ - return getEmptyLast(queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId}); + public static FragmentWritableBatch getEmptyLast(QueryId queryId, + int sendMajorFragmentId, int sendMinorFragmentId, + int receiveMajorFragmentId, int receiveMinorFragmentId) { + return getEmptyLast(queryId, sendMajorFragmentId, sendMinorFragmentId, + receiveMajorFragmentId, new int[]{receiveMinorFragmentId}); } - public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds){ - return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, EMPTY_DEF); + public static FragmentWritableBatch getEmptyLast(QueryId queryId, + int sendMajorFragmentId, int sendMinorFragmentId, + int receiveMajorFragmentId, int[] receiveMinorFragmentIds) { + return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, + sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, + EMPTY_DEF); } - - public static FragmentWritableBatch getEmptyLastWithSchema(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, - final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){ - return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, + public static FragmentWritableBatch getEmptyLastWithSchema( + QueryId queryId, int sendMajorFragmentId, + int sendMinorFragmentId, int receiveMajorFragmentId, + int receiveMinorFragmentId, BatchSchema schema) { + return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId, + sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentId, schema); } - public static FragmentWritableBatch getEmptyBatchWithSchema(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, - final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){ + public static FragmentWritableBatch getEmptyBatchWithSchema( + boolean isLast, QueryId queryId, int sendMajorFragmentId, + int sendMinorFragmentId, int receiveMajorFragmentId, + int receiveMinorFragmentId, BatchSchema schema) { - final RecordBatchDef.Builder def = RecordBatchDef.newBuilder(); + RecordBatchDef.Builder def = RecordBatchDef.newBuilder(); if (schema != null) { - for (final MaterializedField field : schema) { + for (MaterializedField field : schema) { def.addField(field.getSerializedField()); } } - return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, + return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId, + sendMinorFragmentId, receiveMajorFragmentId, new int[] { receiveMinorFragmentId }, def.build()); } - public ByteBuf[] getBuffers(){ + public ByteBuf[] getBuffers() { return buffers; } public long getByteCount() { long n = 0; - for (final ByteBuf buf : buffers) { + for (ByteBuf buf : buffers) { n += buf.readableBytes(); } return n; @@ -99,11 +121,5 @@ public class FragmentWritableBatch{ public FragmentRecordBatch getHeader() { return header; - } - - - - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index 577517d..966ade7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -163,8 +163,11 @@ public class WritableBatch implements AutoCloseable { vv.clear(); } - RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount) - .setCarriesTwoByteSelectionVector(isSV2).build(); + RecordBatchDef batchDef = RecordBatchDef.newBuilder() + .addAllField(metadata) + .setRecordCount(recordCount) + .setCarriesTwoByteSelectionVector(isSV2) + .build(); WritableBatch b = new WritableBatch(batchDef, buffers); return b; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 3e4d94a..79884e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -48,60 +48,89 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory; import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.drill.exec.work.foreman.DrillbitStatusListener; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM; /** * <h2>Overview</h2> * <p> - * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation messages. + * Responsible for running a single fragment on a single Drillbit. + * Listens/responds to status request and cancellation messages. * </p> * <h2>Theory of Operation</h2> * <p> - * The {@link FragmentExecutor} runs a fragment's {@link RootExec} in the {@link FragmentExecutor#run()} method in a single thread. While a fragment is running - * it may be subject to termination requests. The {@link FragmentExecutor} is reponsible for gracefully handling termination requests for the {@link RootExec}. There - * are two types of termination messages: - * <ol> - * <li><b>Cancellation Request:</b> This signals that the fragment and therefore the {@link RootExec} need to terminate immediately.</li> - * <li><b>Receiver Finished:</b> This signals that a downstream receiver no longer needs anymore data. A fragment may recieve multiple receiver finished requests - * (one for each downstream receiver). The {@link RootExec} will only terminate once it has recieved {@link FragmentExecutor.EventType#RECEIVER_FINISHED} messages - * for all downstream receivers.</li> - * </ol> + * The {@link FragmentExecutor} runs a fragment's {@link RootExec} in the + * {@link FragmentExecutor#run()} method in a single thread. While a fragment is + * running it may be subject to termination requests. The + * {@link FragmentExecutor} is responsible for gracefully handling termination + * requests for the {@link RootExec}. There are two types of termination + * messages: + * <ol> + * <li><b>Cancellation Request:</b> This signals that the fragment and therefore + * the {@link RootExec} need to terminate immediately.</li> + * <li><b>Receiver Finished:</b> This signals that a downstream receiver no + * longer needs anymore data. A fragment may receive multiple receiver finished + * requests (one for each downstream receiver). The {@link RootExec} will only + * terminate once it has received + * {@link FragmentExecutor.EventType#RECEIVER_FINISHED} messages for all + * downstream receivers.</li> + * </ol> * </p> * <p> - * The {@link FragmentExecutor} processes termination requests appropriately for the {@link RootExec}. A <b>Cancellation Request</b> is signalled when - * {@link FragmentExecutor#cancel()} is called. A <b>Receiver Finished</b> event is signalled when {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is - * called. The way in which these signals are handled is the following: + * The {@link FragmentExecutor} processes termination requests appropriately for + * the {@link RootExec}. A <b>Cancellation Request</b> is signaled when + * {@link FragmentExecutor#cancel()} is called. A <b>Receiver Finished</b> event + * is signaled when + * {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is called. + * The way in which these signals are handled is the following: * </p> * <h3>Cancellation Request</h3> * <p> - * There are two ways in which a cancellation request can be handled when {@link FragmentExecutor#cancel()} is called. - * <ol> - * <li>The Cancellation Request is recieved before the {@link RootExec} for the fragment is even started. In this case we can cleanup resources allocated for the fragment - * and never start a {@link RootExec}</li> - * <li>The Cancellation Request is recieve after the {@link RootExec} for the fragment is started. In this the cancellation request is sent to the - * {@link FragmentEventProcessor}. If this is not the first cancellation request it is ignored. If this is the first cancellation request the {@link RootExec} for this - * fragment is terminated by interrupting it. Then the {@link FragmentExecutor#run()} thread proceeds to cleanup resources normally</li> - * </ol> + * There are two ways in which a cancellation request can be handled when + * {@link FragmentExecutor#cancel()} is called. + * <ol> + * <li>The Cancellation Request is received before the {@link RootExec} for the + * fragment is even started. In this case we can cleanup resources allocated for + * the fragment and never start a {@link RootExec}</li> + * <li>The Cancellation Request is receive after the {@link RootExec} for the + * fragment is started. In this the cancellation request is sent to the + * {@link FragmentEventProcessor}. If this is not the first cancellation request + * it is ignored. If this is the first cancellation request the {@link RootExec} + * for this fragment is terminated by interrupting it. Then the + * {@link FragmentExecutor#run()} thread proceeds to cleanup resources + * normally</li> + * </ol> * </p> * <h3>Receiver Finished</h3> * <p> - * When {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is called, the message is passed to the {@link FragmentEventProcessor} if we - * did not already recieve a Cancellation request. Then the finished message is queued in {@link FragmentExecutor#receiverFinishedQueue}. The {@link FragmentExecutor#run()} polls - * {@link FragmentExecutor#receiverFinishedQueue} and singlas the {@link RootExec} with {@link RootExec#receivingFragmentFinished(FragmentHandle)} appropriately. + * When {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is + * called, the message is passed to the {@link FragmentEventProcessor} if we did + * not already receive a Cancellation request. Then the finished message is + * queued in {@link FragmentExecutor#receiverFinishedQueue}. The + * {@link FragmentExecutor#run()} polls + * {@link FragmentExecutor#receiverFinishedQueue} and signals the + * {@link RootExec} with + * {@link RootExec#receivingFragmentFinished(FragmentHandle)} appropriately. * </p> - * <h2>Possible Design Flaws / Poorly Defined Behavoir</h2> + * <h2>Possible Design Flaws / Poorly Defined Behavior</h2> * <p> - * There are still a few aspects of the {@link FragmentExecutor} design that are not clear. - * <ol> - * <li>If we get a <b>Receiver Finished</b> message for one downstream receiver, will we eventually get one from every downstream receiver?</li> - * <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we cancel the fragment?</li> - * <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we run out of data from the upstream?</li> - * </ol> + * There are still a few aspects of the {@link FragmentExecutor} design that are + * not clear. + * <ol> + * <li>If we get a <b>Receiver Finished</b> message for one downstream receiver, + * will we eventually get one from every downstream receiver?</li> + * <li>What happens when we process a <b>Receiver Finished</b> message for some + * (but not all) downstream receivers and then we cancel the fragment?</li> + * <li>What happens when we process a <b>Receiver Finished</b> message for some + * (but not all) downstream receivers and then we run out of data from the + * upstream?</li> + * </ol> * </p> */ public class FragmentExecutor implements Runnable { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class); + private static final Logger logger = LoggerFactory.getLogger(FragmentExecutor.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentExecutor.class); private final String fragmentName; @@ -542,7 +571,7 @@ public class FragmentExecutor implements Runnable { * This is especially important as fragments can take longer to start */ private class FragmentEventProcessor extends EventProcessor<FragmentEvent> { - private AtomicBoolean terminate = new AtomicBoolean(false); + private final AtomicBoolean terminate = new AtomicBoolean(false); void cancel() { sendEvent(new FragmentEvent(EventType.CANCEL, null)); diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java index 6c7176b..2352d0a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java @@ -86,7 +86,7 @@ public class QueryBuilder { private QueryId queryId; private int recordCount; private int batchCount; - private long startTime; + private final long startTime; public SummaryOnlyQueryEventListener(QuerySummaryFuture future) { this.future = future; @@ -132,7 +132,7 @@ public class QueryBuilder { * launched the query. */ - private CountDownLatch lock = new CountDownLatch(1); + private final CountDownLatch lock = new CountDownLatch(1); private QuerySummary summary; /** @@ -373,7 +373,7 @@ public class QueryBuilder { // Unload the batch and convert to a row set. - final RecordBatchLoader loader = new RecordBatchLoader(client.allocator()); + RecordBatchLoader loader = new RecordBatchLoader(client.allocator()); try { loader.load(resultBatch.getHeader().getDef(), resultBatch.getData()); resultBatch.release(); @@ -760,18 +760,18 @@ public class QueryBuilder { */ private String queryPlan(String columnName) throws Exception { Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explain an SQL query."); - final List<QueryDataBatch> results = results(); - final RecordBatchLoader loader = new RecordBatchLoader(client.allocator()); - final StringBuilder builder = new StringBuilder(); + List<QueryDataBatch> results = results(); + RecordBatchLoader loader = new RecordBatchLoader(client.allocator()); + StringBuilder builder = new StringBuilder(); - for (final QueryDataBatch b : results) { + for (QueryDataBatch b : results) { if (!b.hasData()) { continue; } loader.load(b.getHeader().getDef(), b.getData()); - final VectorWrapper<?> vw; + VectorWrapper<?> vw; try { vw = loader.getValueAccessorById( NullableVarCharVector.class, @@ -780,9 +780,9 @@ public class QueryBuilder { throw new IllegalStateException("Looks like you did not provide an explain plan query, please add EXPLAIN PLAN FOR to the beginning of your query."); } - final ValueVector vv = vw.getValueVector(); + ValueVector vv = vw.getValueVector(); for (int i = 0; i < vv.getAccessor().getValueCount(); i++) { - final Object o = vv.getAccessor().getObject(i); + Object o = vv.getAccessor().getObject(i); builder.append(o); } loader.clear(); diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java index e32ecc9..d92cb5d 100644 --- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java @@ -299,6 +299,15 @@ public final class ${className} extends BaseDataValueVector implements <#if type int bitsLength = bitsField.getBufferLength(); SerializedField valuesField = metadata.getChild(1); values.load(valuesField, buffer.slice(bitsLength, capacity - bitsLength)); + <#if type.major == "VarLen"> + + // Though a loaded vector should be read only, + // it can have its values set such as when copying + // with transfer pairs. Since lastSet is used when + // setting values, it must be set on vector load. + + mutator.lastSet = accessor.getValueCount() - 1; + </#if> } @Override
