This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bd95af  [NO ISSUE][RT] Randomized Sampling
7bd95af is described below

commit 7bd95afc6cf162d2d043bc9313c6665c641c0841
Author: mikhail <[email protected]>
AuthorDate: Thu Sep 10 15:52:50 2020 -0700

    [NO ISSUE][RT] Randomized Sampling
    
    -user model changes: no
    -storage formate changes: no
    -interface changes: no
    
    Details:
     - LocalSamplingAggregateDescriptor now takes
       random samples instead of first 100
    
    Change-Id: I8889f49b127773d90c6ef8a9f09d8993a4da68ac
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4223
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../std/LocalSamplingAggregateDescriptor.java      | 44 +++++++++++++++-------
 1 file changed, 31 insertions(+), 13 deletions(-)

diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
index 5e0f0d9..51431d3 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
@@ -19,6 +19,8 @@
 package org.apache.asterix.runtime.aggregates.std;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
 
 import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -42,7 +44,6 @@ import 
org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -97,7 +98,9 @@ public class LocalSamplingAggregateDescriptor extends 
AbstractAggregateFunctionD
         private final IPointable inputFieldValue = new VoidPointable();
         private final int numSamplesRequired;
         private final IScalarEvaluator[] sampledFieldsEval;
-        private int numSamples;
+        private final ArrayList<ArrayBackedValueStorage> samples = new 
ArrayList<>();
+        private final Random random = new Random();
+        private int count = 0;
 
         /**
          * @param args the fields that constitute a sample, e.g., $$1, $$2
@@ -119,10 +122,9 @@ public class LocalSamplingAggregateDescriptor extends 
AbstractAggregateFunctionD
 
         @Override
         public void init() throws HyracksDataException {
-            numSamples = 0;
+            samples.clear();
+            count = 0;
             rangeMapBits.reset();
-            // write a dummy integer at the beginning to be filled later with 
the actual number of samples taken
-            IntegerSerializerDeserializer.write(0, 
rangeMapBits.getDataOutput());
         }
 
         /**
@@ -132,15 +134,27 @@ public class LocalSamplingAggregateDescriptor extends 
AbstractAggregateFunctionD
          */
         @Override
         public void step(IFrameTupleReference tuple) throws 
HyracksDataException {
-            if (numSamples >= numSamplesRequired) {
+            count++;
+            if (samples.size() < numSamplesRequired) {
+                ArrayBackedValueStorage sampleStorage = new 
ArrayBackedValueStorage();
+                writeTupleKey(tuple, sampleStorage);
+                samples.add(sampleStorage);
                 return;
             }
-            for (int i = 0; i < sampledFieldsEval.length; i++) {
-                sampledFieldsEval[i].evaluate(tuple, inputFieldValue);
-                
IntegerSerializerDeserializer.write(inputFieldValue.getLength(), 
rangeMapBits.getDataOutput());
-                rangeMapBits.append(inputFieldValue);
+            int swap = random.nextInt(count);
+            if (swap < numSamplesRequired) {
+                writeTupleKey(tuple, samples.get(swap));
+            }
+        }
+
+        private void writeTupleKey(IFrameTupleReference tuple, 
ArrayBackedValueStorage storage)
+                throws HyracksDataException {
+            storage.reset();
+            for (IScalarEvaluator iScalarEvaluator : sampledFieldsEval) {
+                iScalarEvaluator.evaluate(tuple, inputFieldValue);
+                
IntegerSerializerDeserializer.write(inputFieldValue.getLength(), 
storage.getDataOutput());
+                storage.append(inputFieldValue);
             }
-            numSamples++;
         }
 
         /**
@@ -151,7 +165,7 @@ public class LocalSamplingAggregateDescriptor extends 
AbstractAggregateFunctionD
         @Override
         public void finish(IPointable result) throws HyracksDataException {
             storage.reset();
-            if (numSamples == 0) {
+            if (samples.size() == 0) {
                 // empty partition? then send system null as an indication of 
empty partition.
                 try {
                     
storage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
@@ -160,7 +174,11 @@ public class LocalSamplingAggregateDescriptor extends 
AbstractAggregateFunctionD
                     throw HyracksDataException.create(e);
                 }
             } else {
-                IntegerPointable.setInteger(rangeMapBits.getByteArray(), 
rangeMapBits.getStartOffset(), numSamples);
+                rangeMapBits.reset();
+                IntegerSerializerDeserializer.write(samples.size(), 
rangeMapBits.getDataOutput());
+                for (ArrayBackedValueStorage sample : samples) {
+                    rangeMapBits.append(sample);
+                }
                 binary.setValue(rangeMapBits.getByteArray(), 
rangeMapBits.getStartOffset(), rangeMapBits.getLength());
                 binarySerde.serialize(binary, storage.getDataOutput());
                 result.set(storage);

Reply via email to