Repository: asterixdb
Updated Branches:
  refs/heads/master adfb63361 -> 80225e2c2


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
new file mode 100644
index 0000000..24c5cae
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.misc;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.ByteArraySerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.IntArraySerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class ForwardOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private static final int FORWARD_DATA_ACTIVITY_ID = 0;
+    private static final int RANGEMAP_READER_ACTIVITY_ID = 1;
+    private final String rangeMapKeyInContext;
+
+    /**
+     * @param spec used to create the operator id.
+     * @param rangeMapKeyInContext the unique key to store the range map in 
the shared map & transfer it to partitioner.
+     * @param outputRecordDescriptor the output schema of this operator.
+     */
+    public ForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String 
rangeMapKeyInContext,
+            RecordDescriptor outputRecordDescriptor) {
+        super(spec, 2, 1);
+        this.rangeMapKeyInContext = rangeMapKeyInContext;
+        outRecDescs[0] = outputRecordDescriptor;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        ForwardDataActivity forwardDataActivity =
+                new ForwardDataActivity(new ActivityId(odId, 
FORWARD_DATA_ACTIVITY_ID));
+        RangeMapReaderActivity rangeMapReaderActivity =
+                new RangeMapReaderActivity(new ActivityId(odId, 
RANGEMAP_READER_ACTIVITY_ID));
+
+        // range map reader activity, its input is coming through the 
operator's in-port = 1 & activity's in-port = 0
+        builder.addActivity(this, rangeMapReaderActivity);
+        builder.addSourceEdge(1, rangeMapReaderActivity, 0);
+
+        // forward data activity, its input is coming through the operator's 
in-port = 0 & activity's in-port = 0
+        builder.addActivity(this, forwardDataActivity);
+        builder.addSourceEdge(0, forwardDataActivity, 0);
+
+        // forward data activity will wait for the range map reader activity
+        builder.addBlockingEdge(rangeMapReaderActivity, forwardDataActivity);
+
+        // data leaves from the operator's out-port = 0 & forward data 
activity's out-port = 0
+        builder.addTargetEdge(0, forwardDataActivity, 0);
+    }
+
+    /**
+     * Internal class that is used to transfer the {@link RangeMap} object 
between activities in different ctx but in
+     * the same NC, from {@link RangeMapReaderActivity} to {@link 
ForwardDataActivity}. These activities will share
+     * the {@link org.apache.hyracks.api.job.IOperatorEnvironment} of the 
{@link org.apache.hyracks.control.nc.Joblet}
+     * where the range map will be stored.
+     */
+    private class RangeMapState extends AbstractStateObject {
+        RangeMap rangeMap;
+
+        private RangeMapState(JobId jobId, TaskId stateObjectKey) {
+            super(jobId, stateObjectKey);
+        }
+    }
+
+    /**
+     * Range map reader activity. {@see {@link 
RangeMapReaderActivityNodePushable}}
+     */
+    private class RangeMapReaderActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private RangeMapReaderActivity(ActivityId activityId) {
+            super(activityId);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final 
IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int 
partition, int nPartitions)
+                throws HyracksDataException {
+            RecordDescriptor inputRecordDescriptor = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            return new RangeMapReaderActivityNodePushable(ctx, 
inputRecordDescriptor, getActivityId(), partition);
+        }
+    }
+
+    /**
+     * Forward data activity. {@see {@link ForwardDataActivityNodePushable}}
+     */
+    private class ForwardDataActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private ForwardDataActivity(ActivityId activityId) {
+            super(activityId);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final 
IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int 
partition, int nPartitions)
+                throws HyracksDataException {
+            return new ForwardDataActivityNodePushable(ctx, partition);
+        }
+    }
+
+    private class RangeMapReaderActivityNodePushable extends 
AbstractUnaryInputSinkOperatorNodePushable {
+        private final FrameTupleAccessor frameTupleAccessor;
+        private final FrameTupleReference frameTupleReference;
+        private final IHyracksTaskContext ctx;
+        private final ActivityId activityId;
+        private final int partition;
+        private int numFields;
+        private byte[] splitValues;
+        private int[] splitValuesEndOffsets;
+
+        private RangeMapReaderActivityNodePushable(IHyracksTaskContext ctx, 
RecordDescriptor inputRecordDescriptor,
+                ActivityId activityId, int partition) {
+            this.ctx = ctx;
+            this.frameTupleAccessor = new 
FrameTupleAccessor(inputRecordDescriptor);
+            this.frameTupleReference = new FrameTupleReference();
+            this.activityId = activityId;
+            this.partition = partition;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            // this activity does not have a consumer to open (it's a sink), 
and nothing to initialize
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            // "buffer" contains the serialized range map sent by a range map 
computer function.
+            // deserialize the range map
+            frameTupleAccessor.reset(buffer);
+            if (frameTupleAccessor.getTupleCount() != 1) {
+                throw 
HyracksDataException.create(ErrorCode.ONE_TUPLE_RANGEMAP_EXPECTED, sourceLoc);
+            }
+            frameTupleReference.reset(frameTupleAccessor, 0);
+            byte[] rangeMap = frameTupleReference.getFieldData(0);
+            int offset = frameTupleReference.getFieldStart(0);
+            int length = frameTupleReference.getFieldLength(0);
+
+            ByteArrayInputStream rangeMapIn = new 
ByteArrayInputStream(rangeMap, offset, length);
+            DataInputStream dataInputStream = new DataInputStream(rangeMapIn);
+            numFields = IntegerSerializerDeserializer.read(dataInputStream);
+            splitValues = 
ByteArraySerializerDeserializer.read(dataInputStream);
+            splitValuesEndOffsets = 
IntArraySerializerDeserializer.read(dataInputStream);
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            // it's a sink node pushable, nothing to fail
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            // expecting a range map
+            if (numFields <= 0 || splitValues == null || splitValuesEndOffsets 
== null) {
+                throw 
HyracksDataException.create(ErrorCode.NO_RANGEMAP_PRODUCED, sourceLoc);
+            }
+            // store the range map in the state object of ctx so that next 
activity (forward) could retrieve it
+            TaskId rangeMapReaderTaskId = new TaskId(activityId, partition);
+            RangeMapState rangeMapState = new 
RangeMapState(ctx.getJobletContext().getJobId(), rangeMapReaderTaskId);
+            rangeMapState.rangeMap = new RangeMap(numFields, splitValues, 
splitValuesEndOffsets);
+            ctx.setStateObject(rangeMapState);
+        }
+    }
+
+    private class ForwardDataActivityNodePushable extends 
AbstractUnaryInputUnaryOutputOperatorNodePushable {
+        private final IHyracksTaskContext ctx;
+        private final int partition;
+
+        /**
+         * @param ctx used to retrieve the range map stored by the range 
reader activity.
+         * @param partition used to create the same task id used by the range 
reader activity for storing the range.
+         */
+        private ForwardDataActivityNodePushable(IHyracksTaskContext ctx, int 
partition) {
+            this.ctx = ctx;
+            this.partition = partition;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            // retrieve the range map from the state object (previous activity 
should have already stored it)
+            // then deposit it into the ctx so that MToN-partition can pick it 
up
+            Object stateObjKey = new TaskId(new ActivityId(odId, 
RANGEMAP_READER_ACTIVITY_ID), partition);
+            RangeMapState rangeMapState = (RangeMapState) 
ctx.getStateObject(stateObjKey);
+            TaskUtil.put(rangeMapKeyInContext, rangeMapState.rangeMap, ctx);
+            writer.open();
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            writer.nextFrame(buffer);
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            writer.close();
+        }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            writer.flush();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index 4a77b3c..0bead97 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -111,7 +111,7 @@ public abstract class AbstractExternalSortRunMerger {
 
                 int stop = runs.size();
                 currentGenerationRunAvailable.set(0, stop);
-
+                int numberOfPasses = 1;
                 while (true) {
 
                     int unUsed = selectPartialRuns(maxMergeWidth * 
ctx.getInitialFrameSize(), runs, partialRuns,
@@ -147,7 +147,7 @@ public abstract class AbstractExternalSortRunMerger {
                         runs.add(reader);
 
                         if (currentGenerationRunAvailable.isEmpty()) {
-
+                            numberOfPasses++;
                             if (LOGGER.isDebugEnabled()) {
                                 LOGGER.debug("generated runs:" + stop);
                             }
@@ -157,7 +157,10 @@ public abstract class AbstractExternalSortRunMerger {
                             stop = runs.size();
                         }
                     } else {
-                        LOGGER.debug("final runs: {}", stop);
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("final runs: {}", stop);
+                            LOGGER.debug("number of passes: " + 
numberOfPasses);
+                        }
                         merge(finalWriter, partialRuns);
                         break;
                     }

Reply via email to