Repository: asterixdb Updated Branches: refs/heads/master bc918f7ff -> 536c707dc
[NO ISSUE][OTH] Appender flush call with tracing call normal flush - user model changes: no - storage format changes: no - interface changes: no details: - The flush with tracing now calls the normal flush. This enables sub appenders overriding the flush call to maintain correctness. Change-Id: I3f649798fa4cac049f66cc3621acdb28b1c94694 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2080 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/536c707d Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/536c707d Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/536c707d Branch: refs/heads/master Commit: 536c707dc2ffc92a9e2f331b8765697367d9ab3a Parents: bc918f7 Author: Abdullah Alamoudi <[email protected]> Authored: Tue Oct 17 22:11:01 2017 -0700 Committer: Michael Blow <[email protected]> Committed: Wed Oct 18 03:42:36 2017 -0700 ---------------------------------------------------------------------- .../ConnectorDescriptorWithMessagingTest.java | 27 +++++++++++--------- .../common/comm/io/AbstractFrameAppender.java | 8 +++--- .../std/connectors/PartitionDataWriter.java | 9 +++++++ 3 files changed, 28 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/536c707d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java index b1c7ff3..56a45c6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java @@ -49,6 +49,7 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil; import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor; import org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter; import org.apache.hyracks.test.support.TestUtils; +import org.apache.hyracks.util.trace.ITracer; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -81,8 +82,9 @@ public class ConnectorDescriptorWithMessagingTest { BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() }; RecordDescriptor rDesc = new RecordDescriptor(serdes); TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory(); - IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, - CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS); + PartitionWithMessageDataWriter partitioner = + (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory, + CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS); List<TestFrameWriter> recipients = new ArrayList<>(); try { partitioner.open(); @@ -90,7 +92,7 @@ public class ConnectorDescriptorWithMessagingTest { for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) { recipients.add((TestFrameWriter) writer); } - partitioner.flush(); + partitioner.flush(ITracer.NONE, null, null, null); for (TestFrameWriter writer : recipients) { Assert.assertEquals(writer.nextFrameCount(), 1); fta.reset(writer.getLastFrame()); @@ -102,7 +104,7 @@ public class ConnectorDescriptorWithMessagingTest { message.getBuffer().clear(); message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE); message.getBuffer().flip(); - partitioner.flush(); + partitioner.flush(ITracer.NONE, null, null, null);; for (TestFrameWriter writer : recipients) { Assert.assertEquals(writer.nextFrameCount(), 2); fta.reset(writer.getLastFrame()); @@ -115,7 +117,7 @@ public class ConnectorDescriptorWithMessagingTest { message.getBuffer().clear(); message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); message.getBuffer().flip(); - partitioner.flush(); + partitioner.flush(ITracer.NONE, null, null, null);; for (TestFrameWriter writer : recipients) { Assert.assertEquals(writer.nextFrameCount(), 3); fta.reset(writer.getLastFrame()); @@ -159,15 +161,16 @@ public class ConnectorDescriptorWithMessagingTest { BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() }; RecordDescriptor rDesc = new RecordDescriptor(serdes); TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory(); - IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER, - NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS); + PartitionWithMessageDataWriter partitioner = + (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory, + CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS); partitioner.open(); FrameTupleAccessor fta = new FrameTupleAccessor(rDesc); List<TestFrameWriter> recipients = new ArrayList<>(); for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) { recipients.add((TestFrameWriter) writer); } - partitioner.flush(); + partitioner.flush(ITracer.NONE, null, null, null);; for (TestFrameWriter writer : recipients) { Assert.assertEquals(writer.nextFrameCount(), 1); fta.reset(writer.getLastFrame()); @@ -179,7 +182,7 @@ public class ConnectorDescriptorWithMessagingTest { message.getBuffer().clear(); message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE); message.getBuffer().flip(); - partitioner.flush(); + partitioner.flush(ITracer.NONE, null, null, null);; for (TestFrameWriter writer : recipients) { Assert.assertEquals(writer.nextFrameCount(), 2); fta.reset(writer.getLastFrame()); @@ -191,7 +194,7 @@ public class ConnectorDescriptorWithMessagingTest { message.getBuffer().clear(); message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); message.getBuffer().flip(); - partitioner.flush(); + partitioner.flush(ITracer.NONE, null, null, null);; for (TestFrameWriter writer : recipients) { Assert.assertEquals(writer.nextFrameCount(), 3); fta.reset(writer.getLastFrame()); @@ -262,7 +265,7 @@ public class ConnectorDescriptorWithMessagingTest { tuple = ttg.next(); } partitioner.nextFrame(frame.getBuffer()); - partitioner.flush(); + partitioner.flush(ITracer.NONE, null, null, null);; Assert.assertEquals(1, partitionWriterFactory.getWriters().get(0).nextFrameCount()); Assert.assertEquals(2, partitionWriterFactory.getWriters().get(1).nextFrameCount()); Assert.assertEquals(1, partitionWriterFactory.getWriters().get(2).nextFrameCount()); @@ -321,7 +324,7 @@ public class ConnectorDescriptorWithMessagingTest { appender.append(tuple); } partitioner.nextFrame(frame.getBuffer()); - partitioner.flush(); + partitioner.flush(ITracer.NONE, null, null, null);; Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1); Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 1); Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/536c707d/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java index 13632f0..a377d75 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java @@ -120,10 +120,10 @@ public class AbstractFrameAppender implements IFrameAppender { public void flush(IFrameWriter writer, ITracer tracer, String name, String cat, String args) throws HyracksDataException { final long tid = ITracer.check(tracer).durationB(name, cat, args); - if (tupleCount > 0) { - write(writer, true); + try { + flush(writer); + } finally { + ITracer.check(tracer).durationE(tid, args); } - writer.flush(); - ITracer.check(tracer).durationE(tid, args); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/536c707d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java index 189ce9d..4705001 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java @@ -31,6 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.util.trace.ITracer; public class PartitionDataWriter implements IFrameWriter { private final int consumerPartitionCount; @@ -163,6 +164,14 @@ public class PartitionDataWriter implements IFrameWriter { } } + public void flush(ITracer tracer, String name, String cat, String args) throws HyracksDataException { + for (int i = 0; i < consumerPartitionCount; i++) { + if (allocatedFrames[i]) { + appenders[i].flush(pWriters[i], tracer, name, cat, args); + } + } + } + // Wraps the current encountered exception into the final exception. private HyracksDataException wrapException(HyracksDataException finalException, Exception currentException) { if (finalException == null) {
