This is an automated email from the ASF dual-hosted git repository.
dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new fba02b9 [NO ISSUE][COMP] Introduce connector for partial broadcasts
fba02b9 is described below
commit fba02b9f58a0ecfe0375739e33e2d5f237bb8a5d
Author: Stephen Ermshar <[email protected]>
AuthorDate: Tue Aug 13 22:26:47 2019 -0700
[NO ISSUE][COMP] Introduce connector for partial broadcasts
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Introduce MToNPartialBroadcastConnectorDescriptor
- Refactor FieldRangePartitionComputerFactory to
use RangeMapSupplier
Change-Id: I4a6f8f17d1709862300db7ab386161b4dfbfee5a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3524
Tested-by: Jenkins <[email protected]>
Reviewed-by: Dmitry Lychagin <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../physical/RangePartitionExchangePOperator.java | 17 +--
.../RangePartitionMergeExchangePOperator.java | 6 +-
.../value/ITupleMultiPartitionComputer.java | 47 +++++++
.../ITupleMultiPartitionComputerFactory.java} | 20 +--
...erFactory.java => DynamicRangeMapSupplier.java} | 24 +---
.../range/FieldRangePartitionComputerFactory.java | 22 ++-
...nComputerFactory.java => RangeMapSupplier.java} | 20 +--
...terFactory.java => StaticRangeMapSupplier.java} | 14 +-
...riter.java => AbstractPartitionDataWriter.java} | 33 ++---
.../MToNPartialBroadcastConnectorDescriptor.java | 50 +++++++
.../std/connectors/MultiPartitionDataWriter.java | 54 ++++++++
.../std/connectors/PartitionDataWriter.java | 150 +--------------------
12 files changed, 226 insertions(+), 231 deletions(-)
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index bb0081a..fe96d4f 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -45,10 +45,11 @@ import
org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import
org.apache.hyracks.dataflow.common.data.partition.range.DynamicFieldRangePartitionComputerFactory;
+import
org.apache.hyracks.dataflow.common.data.partition.range.DynamicRangeMapSupplier;
import
org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
-import
org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory;
+import
org.apache.hyracks.dataflow.common.data.partition.range.RangeMapSupplier;
+import
org.apache.hyracks.dataflow.common.data.partition.range.StaticRangeMapSupplier;
import
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
public class RangePartitionExchangePOperator extends AbstractExchangePOperator
{
@@ -119,14 +120,10 @@ public class RangePartitionExchangePOperator extends
AbstractExchangePOperator {
comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() ==
OrderKind.ASC);
i++;
}
- FieldRangePartitionComputerFactory partitionerFactory;
- if (rangeMapIsComputedAtRunTime) {
- partitionerFactory = new
DynamicFieldRangePartitionComputerFactory(sortFields, comps,
rangeMapKeyInContext,
- op.getSourceLocation());
- } else {
- partitionerFactory = new
StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
- }
-
+ RangeMapSupplier rangeMapSupplier = rangeMapIsComputedAtRunTime
+ ? new DynamicRangeMapSupplier(rangeMapKeyInContext) : new
StaticRangeMapSupplier(rangeMap);
+ FieldRangePartitionComputerFactory partitionerFactory =
+ new FieldRangePartitionComputerFactory(sortFields, comps,
rangeMapSupplier, op.getSourceLocation());
IConnectorDescriptor conn = new
MToNPartitioningConnectorDescriptor(spec, partitionerFactory);
return new Pair<>(conn, null);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
index 15a2d8f..ba4a6b7 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -52,8 +52,9 @@ import
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import
org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
-import
org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory;
+import
org.apache.hyracks.dataflow.common.data.partition.range.StaticRangeMapSupplier;
import
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
public class RangePartitionMergeExchangePOperator extends
AbstractExchangePOperator {
@@ -138,7 +139,8 @@ public class RangePartitionMergeExchangePOperator extends
AbstractExchangePOpera
comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() ==
OrderKind.ASC);
i++;
}
- ITuplePartitionComputerFactory tpcf = new
StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+ ITuplePartitionComputerFactory tpcf = new
FieldRangePartitionComputerFactory(sortFields, comps,
+ new StaticRangeMapSupplier(rangeMap), op.getSourceLocation());
IConnectorDescriptor conn = new
MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputer.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputer.java
new file mode 100644
index 0000000..13c5ed8
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.dataflow.value;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITupleMultiPartitionComputer {
+ /**
+ * For the tuple (located at tIndex in the frame), it determines which
target partitions (0,1,... nParts-1) the
+ * tuple should be sent/written to.
+ * @param accessor The accessor of the frame to access tuples
+ * @param tIndex The index of the tuple in consideration
+ * @param nParts The number of target partitions
+ * @return The chosen target partitions as dictated by the logic of the
partition computer
+ * @throws HyracksDataException
+ */
+ BitSet partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
throws HyracksDataException;
+
+ /**
+ * Gives the data partitioner a chance to set up its environment before it
starts partitioning tuples. This method
+ * should be called in the open() of {@link
org.apache.hyracks.api.comm.IFrameWriter}. The default implementation
+ * is "do nothing".
+ * @throws HyracksDataException
+ */
+ default void initialize() throws HyracksDataException {
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputerFactory.java
similarity index 56%
copy from
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
copy to
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputerFactory.java
index b17c550..c8821c3 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputerFactory.java
@@ -16,23 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.dataflow.common.data.partition.range;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+package org.apache.hyracks.api.dataflow.value;
-public class StaticFieldRangePartitionComputerFactory extends
FieldRangePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
- private RangeMap rangeMap;
+import java.io.Serializable;
- public StaticFieldRangePartitionComputerFactory(int[] rangeFields,
IBinaryComparatorFactory[] comparatorFactories,
- RangeMap rangeMap) {
- super(rangeFields, comparatorFactories);
- this.rangeMap = rangeMap;
- }
+import org.apache.hyracks.api.context.IHyracksTaskContext;
- @Override
- protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
- return rangeMap;
- }
+public interface ITupleMultiPartitionComputerFactory extends Serializable {
+ ITupleMultiPartitionComputer createPartitioner(IHyracksTaskContext
hyracksTaskContext);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicRangeMapSupplier.java
similarity index 52%
rename from
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
rename to
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicRangeMapSupplier.java
index bc642a9..cfc4b82 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicRangeMapSupplier.java
@@ -16,33 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.hyracks.dataflow.common.data.partition.range;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
-public class DynamicFieldRangePartitionComputerFactory extends
FieldRangePartitionComputerFactory {
+public final class DynamicRangeMapSupplier implements RangeMapSupplier {
+
private static final long serialVersionUID = 1L;
+
private final String rangeMapKeyInContext;
- private final SourceLocation sourceLocation;
- public DynamicFieldRangePartitionComputerFactory(int[] rangeFields,
IBinaryComparatorFactory[] comparatorFactories,
- String rangeMapKeyInContext, SourceLocation sourceLocation) {
- super(rangeFields, comparatorFactories);
+ public DynamicRangeMapSupplier(String rangeMapKeyInContext) {
this.rangeMapKeyInContext = rangeMapKeyInContext;
- this.sourceLocation = sourceLocation;
}
- @Override
- protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext)
throws HyracksDataException {
- RangeMap rangeMap = TaskUtil.get(rangeMapKeyInContext,
hyracksTaskContext);
- if (rangeMap == null) {
- throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND,
sourceLocation);
- }
- return rangeMap;
+ public RangeMap getRangeMap(IHyracksTaskContext taskContext) {
+ return TaskUtil.get(rangeMapKeyInContext, taskContext);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index 55d4420..1831a5f 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -24,22 +24,27 @@ import
org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
-public abstract class FieldRangePartitionComputerFactory implements
ITuplePartitionComputerFactory {
+public final class FieldRangePartitionComputerFactory implements
ITuplePartitionComputerFactory {
private static final long serialVersionUID = 1L;
private final int[] rangeFields;
- private IBinaryComparatorFactory[] comparatorFactories;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final RangeMapSupplier rangeMapSupplier;
+ private final SourceLocation sourceLocation;
- protected FieldRangePartitionComputerFactory(int[] rangeFields,
IBinaryComparatorFactory[] comparatorFactories) {
+ public FieldRangePartitionComputerFactory(int[] rangeFields,
IBinaryComparatorFactory[] comparatorFactories,
+ RangeMapSupplier rangeMapSupplier, SourceLocation sourceLocation) {
this.rangeFields = rangeFields;
+ this.rangeMapSupplier = rangeMapSupplier;
this.comparatorFactories = comparatorFactories;
+ this.sourceLocation = sourceLocation;
}
- protected abstract RangeMap getRangeMap(IHyracksTaskContext
hyracksTaskContext) throws HyracksDataException;
-
@Override
- public ITuplePartitionComputer createPartitioner(IHyracksTaskContext
hyracksTaskContext) {
+ public ITuplePartitionComputer createPartitioner(IHyracksTaskContext
taskContext) {
final IBinaryComparator[] comparators = new
IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -50,7 +55,10 @@ public abstract class FieldRangePartitionComputerFactory
implements ITuplePartit
@Override
public void initialize() throws HyracksDataException {
- rangeMap = getRangeMap(hyracksTaskContext);
+ rangeMap = rangeMapSupplier.getRangeMap(taskContext);
+ if (rangeMap == null) {
+ throw
HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLocation);
+ }
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMapSupplier.java
similarity index 60%
copy from
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
copy to
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMapSupplier.java
index b17c550..fc0911e 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMapSupplier.java
@@ -16,23 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.dataflow.common.data.partition.range;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+package org.apache.hyracks.dataflow.common.data.partition.range;
-public class StaticFieldRangePartitionComputerFactory extends
FieldRangePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
- private RangeMap rangeMap;
+import java.io.Serializable;
- public StaticFieldRangePartitionComputerFactory(int[] rangeFields,
IBinaryComparatorFactory[] comparatorFactories,
- RangeMap rangeMap) {
- super(rangeFields, comparatorFactories);
- this.rangeMap = rangeMap;
- }
+import org.apache.hyracks.api.context.IHyracksTaskContext;
- @Override
- protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
- return rangeMap;
- }
+public interface RangeMapSupplier extends Serializable {
+ RangeMap getRangeMap(IHyracksTaskContext taskContext);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticRangeMapSupplier.java
similarity index 69%
rename from
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
rename to
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticRangeMapSupplier.java
index b17c550..613ccc5 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticRangeMapSupplier.java
@@ -16,23 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.hyracks.dataflow.common.data.partition.range;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-public class StaticFieldRangePartitionComputerFactory extends
FieldRangePartitionComputerFactory {
+public final class StaticRangeMapSupplier implements RangeMapSupplier {
+
private static final long serialVersionUID = 1L;
- private RangeMap rangeMap;
- public StaticFieldRangePartitionComputerFactory(int[] rangeFields,
IBinaryComparatorFactory[] comparatorFactories,
- RangeMap rangeMap) {
- super(rangeFields, comparatorFactories);
+ private final RangeMap rangeMap;
+
+ public StaticRangeMapSupplier(RangeMap rangeMap) {
this.rangeMap = rangeMap;
}
@Override
- protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
+ public RangeMap getRangeMap(IHyracksTaskContext taskContext) {
return rangeMap;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
similarity index 87%
copy from
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
copy to
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
index d06d5d3..03f260a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.hyracks.dataflow.std.connectors;
import java.io.IOException;
@@ -25,7 +26,6 @@ import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -33,21 +33,20 @@ import
org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.util.trace.ITracer;
-public class PartitionDataWriter implements IFrameWriter {
- private final int consumerPartitionCount;
+abstract class AbstractPartitionDataWriter implements IFrameWriter {
+
+ protected final int consumerPartitionCount;
private final IFrameWriter[] pWriters;
private final boolean[] isOpen;
private final FrameTupleAppender[] appenders;
- private final FrameTupleAccessor tupleAccessor;
- private final ITuplePartitionComputer tpc;
- private final IHyracksTaskContext ctx;
+ protected final FrameTupleAccessor tupleAccessor;
+ protected final IHyracksTaskContext ctx;
private boolean[] allocatedFrames;
private boolean failed = false;
- public PartitionDataWriter(IHyracksTaskContext ctx, int
consumerPartitionCount, IPartitionWriterFactory pwFactory,
- RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc)
throws HyracksDataException {
+ public AbstractPartitionDataWriter(IHyracksTaskContext ctx, int
consumerPartitionCount,
+ IPartitionWriterFactory pwFactory, RecordDescriptor
recordDescriptor) throws HyracksDataException {
this.ctx = ctx;
- this.tpc = tpc;
this.consumerPartitionCount = consumerPartitionCount;
pWriters = new IFrameWriter[consumerPartitionCount];
isOpen = new boolean[consumerPartitionCount];
@@ -113,7 +112,6 @@ public class PartitionDataWriter implements IFrameWriter {
@Override
public void open() throws HyracksDataException {
- tpc.initialize();
for (int i = 0; i < pWriters.length; ++i) {
isOpen[i] = true;
pWriters[i].open();
@@ -125,12 +123,17 @@ public class PartitionDataWriter implements IFrameWriter {
tupleAccessor.reset(buffer);
int tupleCount = tupleAccessor.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
- int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
- if (!allocatedFrames[h]) {
- allocateFrames(h);
- }
- FrameUtils.appendToWriter(pWriters[h], appenders[h],
tupleAccessor, i);
+ processTuple(i);
+ }
+ }
+
+ protected abstract void processTuple(int tupleIndex) throws
HyracksDataException;
+
+ protected void appendToPartitionWriter(int tupleIndex, int partition)
throws HyracksDataException {
+ if (!allocatedFrames[partition]) {
+ allocateFrames(partition);
}
+ FrameUtils.appendToWriter(pWriters[partition], appenders[partition],
tupleAccessor, tupleIndex);
}
protected void allocateFrames(int i) throws HyracksDataException {
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartialBroadcastConnectorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartialBroadcastConnectorDescriptor.java
new file mode 100644
index 0000000..5d79f8c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartialBroadcastConnectorDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import
org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+
+public class MToNPartialBroadcastConnectorDescriptor extends
AbstractMToNConnectorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ protected ITupleMultiPartitionComputerFactory tpcf;
+
+ public
MToNPartialBroadcastConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ITupleMultiPartitionComputerFactory tpcf) {
+ super(spec);
+ this.tpcf = tpcf;
+ }
+
+ @Override
+ public IFrameWriter createPartitioner(IHyracksTaskContext ctx,
RecordDescriptor recordDesc,
+ IPartitionWriterFactory edwFactory, int index, int
nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
+ return new MultiPartitionDataWriter(ctx, nConsumerPartitions,
edwFactory, recordDesc,
+ tpcf.createPartitioner(ctx));
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MultiPartitionDataWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MultiPartitionDataWriter.java
new file mode 100644
index 0000000..aed39df
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MultiPartitionDataWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MultiPartitionDataWriter extends AbstractPartitionDataWriter {
+
+ private final ITupleMultiPartitionComputer tpc;
+
+ public MultiPartitionDataWriter(IHyracksTaskContext ctx, int
consumerPartitionCount,
+ IPartitionWriterFactory pwFactory, RecordDescriptor
recordDescriptor, ITupleMultiPartitionComputer tpc)
+ throws HyracksDataException {
+ super(ctx, consumerPartitionCount, pwFactory, recordDescriptor);
+ this.tpc = tpc;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ super.open();
+ tpc.initialize();
+ }
+
+ @Override
+ protected void processTuple(int tupleIndex) throws HyracksDataException {
+ BitSet partitionSet = tpc.partition(tupleAccessor, tupleIndex,
consumerPartitionCount);
+ for (int p = partitionSet.nextSetBit(0); p >= 0; p =
partitionSet.nextSetBit(p + 1)) {
+ appendToPartitionWriter(tupleIndex, p);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index d06d5d3..e67f9a8 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -18,167 +18,31 @@
*/
package org.apache.hyracks.dataflow.std.connectors;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.util.trace.ITracer;
-public class PartitionDataWriter implements IFrameWriter {
- private final int consumerPartitionCount;
- private final IFrameWriter[] pWriters;
- private final boolean[] isOpen;
- private final FrameTupleAppender[] appenders;
- private final FrameTupleAccessor tupleAccessor;
+public class PartitionDataWriter extends AbstractPartitionDataWriter {
+
private final ITuplePartitionComputer tpc;
- private final IHyracksTaskContext ctx;
- private boolean[] allocatedFrames;
- private boolean failed = false;
public PartitionDataWriter(IHyracksTaskContext ctx, int
consumerPartitionCount, IPartitionWriterFactory pwFactory,
RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc)
throws HyracksDataException {
- this.ctx = ctx;
+ super(ctx, consumerPartitionCount, pwFactory, recordDescriptor);
this.tpc = tpc;
- this.consumerPartitionCount = consumerPartitionCount;
- pWriters = new IFrameWriter[consumerPartitionCount];
- isOpen = new boolean[consumerPartitionCount];
- allocatedFrames = new boolean[consumerPartitionCount];
- appenders = new FrameTupleAppender[consumerPartitionCount];
- tupleAccessor = new FrameTupleAccessor(recordDescriptor);
- initializeAppenders(pwFactory);
- }
-
- protected void initializeAppenders(IPartitionWriterFactory pwFactory)
throws HyracksDataException {
- for (int i = 0; i < consumerPartitionCount; ++i) {
- try {
- pWriters[i] = pwFactory.createFrameWriter(i);
- appenders[i] = createTupleAppender(ctx);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- }
- }
-
- protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
- return new FrameTupleAppender();
- }
-
- @Override
- public void close() throws HyracksDataException {
- HyracksDataException closeException = null;
- if (!failed) {
- boolean newFailure = false;
- for (int i = 0; i < pWriters.length; ++i) {
- try {
- if (isOpen[i] && allocatedFrames[i] &&
appenders[i].getTupleCount() > 0) {
- appenders[i].write(pWriters[i], true);
- }
- } catch (Exception e) {
- newFailure = true;
- closeException = wrapException(closeException, e);
- break;
- }
- }
- if (newFailure) {
- try {
- fail(); // Fail all writers if any new failure happens.
- } catch (Exception e) {
- closeException = wrapException(closeException, e);
- }
- }
- }
- for (int i = 0; i < pWriters.length; ++i) {
- if (isOpen[i]) {
- // The try-block make sures that every writer is closed.
- try {
- pWriters[i].close();
- } catch (Exception e) {
- closeException = wrapException(closeException, e);
- }
- }
- }
- if (closeException != null) {
- throw closeException;
- }
}
@Override
public void open() throws HyracksDataException {
+ super.open();
tpc.initialize();
- for (int i = 0; i < pWriters.length; ++i) {
- isOpen[i] = true;
- pWriters[i].open();
- }
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tupleAccessor.reset(buffer);
- int tupleCount = tupleAccessor.getTupleCount();
- for (int i = 0; i < tupleCount; ++i) {
- int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
- if (!allocatedFrames[h]) {
- allocateFrames(h);
- }
- FrameUtils.appendToWriter(pWriters[h], appenders[h],
tupleAccessor, i);
- }
- }
-
- protected void allocateFrames(int i) throws HyracksDataException {
- appenders[i].reset(new VSizeFrame(ctx), true);
- allocatedFrames[i] = true;
- }
-
- @Override
- public void fail() throws HyracksDataException {
- failed = true;
- HyracksDataException failException = null;
- for (int i = 0; i < appenders.length; ++i) {
- if (isOpen[i]) {
- try {
- pWriters[i].fail();
- } catch (Exception e) {
- failException = wrapException(failException, e);
- }
- }
- }
- if (failException != null) {
- throw failException;
- }
- }
-
- @Override
- public void flush() throws HyracksDataException {
- for (int i = 0; i < consumerPartitionCount; i++) {
- if (allocatedFrames[i]) {
- appenders[i].flush(pWriters[i]);
- }
- }
- }
-
- public void flush(ITracer tracer, String name, long cat, String args)
throws HyracksDataException {
- for (int i = 0; i < consumerPartitionCount; i++) {
- if (allocatedFrames[i]) {
- appenders[i].flush(pWriters[i], tracer, name, cat, args);
- }
- }
- }
-
- // Wraps the current encountered exception into the final exception.
- private HyracksDataException wrapException(HyracksDataException
finalException, Exception currentException) {
- if (finalException == null) {
- return HyracksDataException.create(currentException);
- }
- finalException.addSuppressed(currentException);
- return finalException;
+ protected void processTuple(int tupleIndex) throws HyracksDataException {
+ int p = tpc.partition(tupleAccessor, tupleIndex,
consumerPartitionCount);
+ appendToPartitionWriter(tupleIndex, p);
}
}