This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fba02b9  [NO ISSUE][COMP] Introduce connector for partial broadcasts
fba02b9 is described below

commit fba02b9f58a0ecfe0375739e33e2d5f237bb8a5d
Author: Stephen Ermshar <[email protected]>
AuthorDate: Tue Aug 13 22:26:47 2019 -0700

    [NO ISSUE][COMP] Introduce connector for partial broadcasts
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Introduce MToNPartialBroadcastConnectorDescriptor
    - Refactor FieldRangePartitionComputerFactory to
      use RangeMapSupplier
    
    Change-Id: I4a6f8f17d1709862300db7ab386161b4dfbfee5a
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3524
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Dmitry Lychagin <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../physical/RangePartitionExchangePOperator.java  |  17 +--
 .../RangePartitionMergeExchangePOperator.java      |   6 +-
 .../value/ITupleMultiPartitionComputer.java        |  47 +++++++
 .../ITupleMultiPartitionComputerFactory.java}      |  20 +--
 ...erFactory.java => DynamicRangeMapSupplier.java} |  24 +---
 .../range/FieldRangePartitionComputerFactory.java  |  22 ++-
 ...nComputerFactory.java => RangeMapSupplier.java} |  20 +--
 ...terFactory.java => StaticRangeMapSupplier.java} |  14 +-
 ...riter.java => AbstractPartitionDataWriter.java} |  33 ++---
 .../MToNPartialBroadcastConnectorDescriptor.java   |  50 +++++++
 .../std/connectors/MultiPartitionDataWriter.java   |  54 ++++++++
 .../std/connectors/PartitionDataWriter.java        | 150 +--------------------
 12 files changed, 226 insertions(+), 231 deletions(-)

diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index bb0081a..fe96d4f 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -45,10 +45,11 @@ import 
org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import 
org.apache.hyracks.dataflow.common.data.partition.range.DynamicFieldRangePartitionComputerFactory;
+import 
org.apache.hyracks.dataflow.common.data.partition.range.DynamicRangeMapSupplier;
 import 
org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
-import 
org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory;
+import 
org.apache.hyracks.dataflow.common.data.partition.range.RangeMapSupplier;
+import 
org.apache.hyracks.dataflow.common.data.partition.range.StaticRangeMapSupplier;
 import 
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 
 public class RangePartitionExchangePOperator extends AbstractExchangePOperator 
{
@@ -119,14 +120,10 @@ public class RangePartitionExchangePOperator extends 
AbstractExchangePOperator {
             comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == 
OrderKind.ASC);
             i++;
         }
-        FieldRangePartitionComputerFactory partitionerFactory;
-        if (rangeMapIsComputedAtRunTime) {
-            partitionerFactory = new 
DynamicFieldRangePartitionComputerFactory(sortFields, comps, 
rangeMapKeyInContext,
-                    op.getSourceLocation());
-        } else {
-            partitionerFactory = new 
StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
-        }
-
+        RangeMapSupplier rangeMapSupplier = rangeMapIsComputedAtRunTime
+                ? new DynamicRangeMapSupplier(rangeMapKeyInContext) : new 
StaticRangeMapSupplier(rangeMap);
+        FieldRangePartitionComputerFactory partitionerFactory =
+                new FieldRangePartitionComputerFactory(sortFields, comps, 
rangeMapSupplier, op.getSourceLocation());
         IConnectorDescriptor conn = new 
MToNPartitioningConnectorDescriptor(spec, partitionerFactory);
         return new Pair<>(conn, null);
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
index 15a2d8f..ba4a6b7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -52,8 +52,9 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import 
org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
-import 
org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory;
+import 
org.apache.hyracks.dataflow.common.data.partition.range.StaticRangeMapSupplier;
 import 
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 
 public class RangePartitionMergeExchangePOperator extends 
AbstractExchangePOperator {
@@ -138,7 +139,8 @@ public class RangePartitionMergeExchangePOperator extends 
AbstractExchangePOpera
             comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == 
OrderKind.ASC);
             i++;
         }
-        ITuplePartitionComputerFactory tpcf = new 
StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+        ITuplePartitionComputerFactory tpcf = new 
FieldRangePartitionComputerFactory(sortFields, comps,
+                new StaticRangeMapSupplier(rangeMap), op.getSourceLocation());
         IConnectorDescriptor conn = new 
MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputer.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputer.java
new file mode 100644
index 0000000..13c5ed8
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.dataflow.value;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITupleMultiPartitionComputer {
+    /**
+     * For the tuple (located at tIndex in the frame), it determines which 
target partitions (0,1,... nParts-1) the
+     * tuple should be sent/written to.
+     * @param accessor The accessor of the frame to access tuples
+     * @param tIndex The index of the tuple in consideration
+     * @param nParts The number of target partitions
+     * @return The chosen target partitions as dictated by the logic of the 
partition computer
+     * @throws HyracksDataException
+     */
+    BitSet partition(IFrameTupleAccessor accessor, int tIndex, int nParts) 
throws HyracksDataException;
+
+    /**
+     * Gives the data partitioner a chance to set up its environment before it 
starts partitioning tuples. This method
+     * should be called in the open() of {@link 
org.apache.hyracks.api.comm.IFrameWriter}. The default implementation
+     * is "do nothing".
+     * @throws HyracksDataException
+     */
+    default void initialize() throws HyracksDataException {
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputerFactory.java
similarity index 56%
copy from 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
copy to 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputerFactory.java
index b17c550..c8821c3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleMultiPartitionComputerFactory.java
@@ -16,23 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.dataflow.common.data.partition.range;
 
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+package org.apache.hyracks.api.dataflow.value;
 
-public class StaticFieldRangePartitionComputerFactory extends 
FieldRangePartitionComputerFactory {
-    private static final long serialVersionUID = 1L;
-    private RangeMap rangeMap;
+import java.io.Serializable;
 
-    public StaticFieldRangePartitionComputerFactory(int[] rangeFields, 
IBinaryComparatorFactory[] comparatorFactories,
-            RangeMap rangeMap) {
-        super(rangeFields, comparatorFactories);
-        this.rangeMap = rangeMap;
-    }
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
-    @Override
-    protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
-        return rangeMap;
-    }
+public interface ITupleMultiPartitionComputerFactory extends Serializable {
+    ITupleMultiPartitionComputer createPartitioner(IHyracksTaskContext 
hyracksTaskContext);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicRangeMapSupplier.java
similarity index 52%
rename from 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
rename to 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicRangeMapSupplier.java
index bc642a9..cfc4b82 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicRangeMapSupplier.java
@@ -16,33 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.dataflow.common.data.partition.range;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 
-public class DynamicFieldRangePartitionComputerFactory extends 
FieldRangePartitionComputerFactory {
+public final class DynamicRangeMapSupplier implements RangeMapSupplier {
+
     private static final long serialVersionUID = 1L;
+
     private final String rangeMapKeyInContext;
-    private final SourceLocation sourceLocation;
 
-    public DynamicFieldRangePartitionComputerFactory(int[] rangeFields, 
IBinaryComparatorFactory[] comparatorFactories,
-            String rangeMapKeyInContext, SourceLocation sourceLocation) {
-        super(rangeFields, comparatorFactories);
+    public DynamicRangeMapSupplier(String rangeMapKeyInContext) {
         this.rangeMapKeyInContext = rangeMapKeyInContext;
-        this.sourceLocation = sourceLocation;
     }
 
-    @Override
-    protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) 
throws HyracksDataException {
-        RangeMap rangeMap = TaskUtil.get(rangeMapKeyInContext, 
hyracksTaskContext);
-        if (rangeMap == null) {
-            throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, 
sourceLocation);
-        }
-        return rangeMap;
+    public RangeMap getRangeMap(IHyracksTaskContext taskContext) {
+        return TaskUtil.get(rangeMapKeyInContext, taskContext);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index 55d4420..1831a5f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -24,22 +24,27 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
-public abstract class FieldRangePartitionComputerFactory implements 
ITuplePartitionComputerFactory {
+public final class FieldRangePartitionComputerFactory implements 
ITuplePartitionComputerFactory {
     private static final long serialVersionUID = 1L;
     private final int[] rangeFields;
-    private IBinaryComparatorFactory[] comparatorFactories;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final RangeMapSupplier rangeMapSupplier;
+    private final SourceLocation sourceLocation;
 
-    protected FieldRangePartitionComputerFactory(int[] rangeFields, 
IBinaryComparatorFactory[] comparatorFactories) {
+    public FieldRangePartitionComputerFactory(int[] rangeFields, 
IBinaryComparatorFactory[] comparatorFactories,
+            RangeMapSupplier rangeMapSupplier, SourceLocation sourceLocation) {
         this.rangeFields = rangeFields;
+        this.rangeMapSupplier = rangeMapSupplier;
         this.comparatorFactories = comparatorFactories;
+        this.sourceLocation = sourceLocation;
     }
 
-    protected abstract RangeMap getRangeMap(IHyracksTaskContext 
hyracksTaskContext) throws HyracksDataException;
-
     @Override
-    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext 
hyracksTaskContext) {
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext 
taskContext) {
         final IBinaryComparator[] comparators = new 
IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -50,7 +55,10 @@ public abstract class FieldRangePartitionComputerFactory 
implements ITuplePartit
 
             @Override
             public void initialize() throws HyracksDataException {
-                rangeMap = getRangeMap(hyracksTaskContext);
+                rangeMap = rangeMapSupplier.getRangeMap(taskContext);
+                if (rangeMap == null) {
+                    throw 
HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLocation);
+                }
             }
 
             @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMapSupplier.java
similarity index 60%
copy from 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
copy to 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMapSupplier.java
index b17c550..fc0911e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMapSupplier.java
@@ -16,23 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.dataflow.common.data.partition.range;
 
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+package org.apache.hyracks.dataflow.common.data.partition.range;
 
-public class StaticFieldRangePartitionComputerFactory extends 
FieldRangePartitionComputerFactory {
-    private static final long serialVersionUID = 1L;
-    private RangeMap rangeMap;
+import java.io.Serializable;
 
-    public StaticFieldRangePartitionComputerFactory(int[] rangeFields, 
IBinaryComparatorFactory[] comparatorFactories,
-            RangeMap rangeMap) {
-        super(rangeFields, comparatorFactories);
-        this.rangeMap = rangeMap;
-    }
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
-    @Override
-    protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
-        return rangeMap;
-    }
+public interface RangeMapSupplier extends Serializable {
+    RangeMap getRangeMap(IHyracksTaskContext taskContext);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticRangeMapSupplier.java
similarity index 69%
rename from 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
rename to 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticRangeMapSupplier.java
index b17c550..613ccc5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticRangeMapSupplier.java
@@ -16,23 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.dataflow.common.data.partition.range;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 
-public class StaticFieldRangePartitionComputerFactory extends 
FieldRangePartitionComputerFactory {
+public final class StaticRangeMapSupplier implements RangeMapSupplier {
+
     private static final long serialVersionUID = 1L;
-    private RangeMap rangeMap;
 
-    public StaticFieldRangePartitionComputerFactory(int[] rangeFields, 
IBinaryComparatorFactory[] comparatorFactories,
-            RangeMap rangeMap) {
-        super(rangeFields, comparatorFactories);
+    private final RangeMap rangeMap;
+
+    public StaticRangeMapSupplier(RangeMap rangeMap) {
         this.rangeMap = rangeMap;
     }
 
     @Override
-    protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
+    public RangeMap getRangeMap(IHyracksTaskContext taskContext) {
         return rangeMap;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
similarity index 87%
copy from 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
copy to 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
index d06d5d3..03f260a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/AbstractPartitionDataWriter.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.hyracks.dataflow.std.connectors;
 
 import java.io.IOException;
@@ -25,7 +26,6 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.IPartitionWriterFactory;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -33,21 +33,20 @@ import 
org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.util.trace.ITracer;
 
-public class PartitionDataWriter implements IFrameWriter {
-    private final int consumerPartitionCount;
+abstract class AbstractPartitionDataWriter implements IFrameWriter {
+
+    protected final int consumerPartitionCount;
     private final IFrameWriter[] pWriters;
     private final boolean[] isOpen;
     private final FrameTupleAppender[] appenders;
-    private final FrameTupleAccessor tupleAccessor;
-    private final ITuplePartitionComputer tpc;
-    private final IHyracksTaskContext ctx;
+    protected final FrameTupleAccessor tupleAccessor;
+    protected final IHyracksTaskContext ctx;
     private boolean[] allocatedFrames;
     private boolean failed = false;
 
-    public PartitionDataWriter(IHyracksTaskContext ctx, int 
consumerPartitionCount, IPartitionWriterFactory pwFactory,
-            RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) 
throws HyracksDataException {
+    public AbstractPartitionDataWriter(IHyracksTaskContext ctx, int 
consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor 
recordDescriptor) throws HyracksDataException {
         this.ctx = ctx;
-        this.tpc = tpc;
         this.consumerPartitionCount = consumerPartitionCount;
         pWriters = new IFrameWriter[consumerPartitionCount];
         isOpen = new boolean[consumerPartitionCount];
@@ -113,7 +112,6 @@ public class PartitionDataWriter implements IFrameWriter {
 
     @Override
     public void open() throws HyracksDataException {
-        tpc.initialize();
         for (int i = 0; i < pWriters.length; ++i) {
             isOpen[i] = true;
             pWriters[i].open();
@@ -125,12 +123,17 @@ public class PartitionDataWriter implements IFrameWriter {
         tupleAccessor.reset(buffer);
         int tupleCount = tupleAccessor.getTupleCount();
         for (int i = 0; i < tupleCount; ++i) {
-            int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
-            if (!allocatedFrames[h]) {
-                allocateFrames(h);
-            }
-            FrameUtils.appendToWriter(pWriters[h], appenders[h], 
tupleAccessor, i);
+            processTuple(i);
+        }
+    }
+
+    protected abstract void processTuple(int tupleIndex) throws 
HyracksDataException;
+
+    protected void appendToPartitionWriter(int tupleIndex, int partition) 
throws HyracksDataException {
+        if (!allocatedFrames[partition]) {
+            allocateFrames(partition);
         }
+        FrameUtils.appendToWriter(pWriters[partition], appenders[partition], 
tupleAccessor, tupleIndex);
     }
 
     protected void allocateFrames(int i) throws HyracksDataException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartialBroadcastConnectorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartialBroadcastConnectorDescriptor.java
new file mode 100644
index 0000000..5d79f8c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartialBroadcastConnectorDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import 
org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+
+public class MToNPartialBroadcastConnectorDescriptor extends 
AbstractMToNConnectorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    protected ITupleMultiPartitionComputerFactory tpcf;
+
+    public 
MToNPartialBroadcastConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITupleMultiPartitionComputerFactory tpcf) {
+        super(spec);
+        this.tpcf = tpcf;
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, 
RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int 
nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        return new MultiPartitionDataWriter(ctx, nConsumerPartitions, 
edwFactory, recordDesc,
+                tpcf.createPartitioner(ctx));
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MultiPartitionDataWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MultiPartitionDataWriter.java
new file mode 100644
index 0000000..aed39df
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MultiPartitionDataWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MultiPartitionDataWriter extends AbstractPartitionDataWriter {
+
+    private final ITupleMultiPartitionComputer tpc;
+
+    public MultiPartitionDataWriter(IHyracksTaskContext ctx, int 
consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor 
recordDescriptor, ITupleMultiPartitionComputer tpc)
+            throws HyracksDataException {
+        super(ctx, consumerPartitionCount, pwFactory, recordDescriptor);
+        this.tpc = tpc;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        tpc.initialize();
+    }
+
+    @Override
+    protected void processTuple(int tupleIndex) throws HyracksDataException {
+        BitSet partitionSet = tpc.partition(tupleAccessor, tupleIndex, 
consumerPartitionCount);
+        for (int p = partitionSet.nextSetBit(0); p >= 0; p = 
partitionSet.nextSetBit(p + 1)) {
+            appendToPartitionWriter(tupleIndex, p);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index d06d5d3..e67f9a8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -18,167 +18,31 @@
  */
 package org.apache.hyracks.dataflow.std.connectors;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.IPartitionWriterFactory;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.util.trace.ITracer;
 
-public class PartitionDataWriter implements IFrameWriter {
-    private final int consumerPartitionCount;
-    private final IFrameWriter[] pWriters;
-    private final boolean[] isOpen;
-    private final FrameTupleAppender[] appenders;
-    private final FrameTupleAccessor tupleAccessor;
+public class PartitionDataWriter extends AbstractPartitionDataWriter {
+
     private final ITuplePartitionComputer tpc;
-    private final IHyracksTaskContext ctx;
-    private boolean[] allocatedFrames;
-    private boolean failed = false;
 
     public PartitionDataWriter(IHyracksTaskContext ctx, int 
consumerPartitionCount, IPartitionWriterFactory pwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) 
throws HyracksDataException {
-        this.ctx = ctx;
+        super(ctx, consumerPartitionCount, pwFactory, recordDescriptor);
         this.tpc = tpc;
-        this.consumerPartitionCount = consumerPartitionCount;
-        pWriters = new IFrameWriter[consumerPartitionCount];
-        isOpen = new boolean[consumerPartitionCount];
-        allocatedFrames = new boolean[consumerPartitionCount];
-        appenders = new FrameTupleAppender[consumerPartitionCount];
-        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
-        initializeAppenders(pwFactory);
-    }
-
-    protected void initializeAppenders(IPartitionWriterFactory pwFactory) 
throws HyracksDataException {
-        for (int i = 0; i < consumerPartitionCount; ++i) {
-            try {
-                pWriters[i] = pwFactory.createFrameWriter(i);
-                appenders[i] = createTupleAppender(ctx);
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
-            }
-        }
-    }
-
-    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
-        return new FrameTupleAppender();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        HyracksDataException closeException = null;
-        if (!failed) {
-            boolean newFailure = false;
-            for (int i = 0; i < pWriters.length; ++i) {
-                try {
-                    if (isOpen[i] && allocatedFrames[i] && 
appenders[i].getTupleCount() > 0) {
-                        appenders[i].write(pWriters[i], true);
-                    }
-                } catch (Exception e) {
-                    newFailure = true;
-                    closeException = wrapException(closeException, e);
-                    break;
-                }
-            }
-            if (newFailure) {
-                try {
-                    fail(); // Fail all writers if any new failure happens.
-                } catch (Exception e) {
-                    closeException = wrapException(closeException, e);
-                }
-            }
-        }
-        for (int i = 0; i < pWriters.length; ++i) {
-            if (isOpen[i]) {
-                // The try-block make sures that every writer is closed.
-                try {
-                    pWriters[i].close();
-                } catch (Exception e) {
-                    closeException = wrapException(closeException, e);
-                }
-            }
-        }
-        if (closeException != null) {
-            throw closeException;
-        }
     }
 
     @Override
     public void open() throws HyracksDataException {
+        super.open();
         tpc.initialize();
-        for (int i = 0; i < pWriters.length; ++i) {
-            isOpen[i] = true;
-            pWriters[i].open();
-        }
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        tupleAccessor.reset(buffer);
-        int tupleCount = tupleAccessor.getTupleCount();
-        for (int i = 0; i < tupleCount; ++i) {
-            int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
-            if (!allocatedFrames[h]) {
-                allocateFrames(h);
-            }
-            FrameUtils.appendToWriter(pWriters[h], appenders[h], 
tupleAccessor, i);
-        }
-    }
-
-    protected void allocateFrames(int i) throws HyracksDataException {
-        appenders[i].reset(new VSizeFrame(ctx), true);
-        allocatedFrames[i] = true;
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        failed = true;
-        HyracksDataException failException = null;
-        for (int i = 0; i < appenders.length; ++i) {
-            if (isOpen[i]) {
-                try {
-                    pWriters[i].fail();
-                } catch (Exception e) {
-                    failException = wrapException(failException, e);
-                }
-            }
-        }
-        if (failException != null) {
-            throw failException;
-        }
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        for (int i = 0; i < consumerPartitionCount; i++) {
-            if (allocatedFrames[i]) {
-                appenders[i].flush(pWriters[i]);
-            }
-        }
-    }
-
-    public void flush(ITracer tracer, String name, long cat, String args) 
throws HyracksDataException {
-        for (int i = 0; i < consumerPartitionCount; i++) {
-            if (allocatedFrames[i]) {
-                appenders[i].flush(pWriters[i], tracer, name, cat, args);
-            }
-        }
-    }
-
-    // Wraps the current encountered exception into the final exception.
-    private HyracksDataException wrapException(HyracksDataException 
finalException, Exception currentException) {
-        if (finalException == null) {
-            return HyracksDataException.create(currentException);
-        }
-        finalException.addSuppressed(currentException);
-        return finalException;
+    protected void processTuple(int tupleIndex) throws HyracksDataException {
+        int p = tpc.partition(tupleAccessor, tupleIndex, 
consumerPartitionCount);
+        appendToPartitionWriter(tupleIndex, p);
     }
 }

Reply via email to