This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7bd95af [NO ISSUE][RT] Randomized Sampling
7bd95af is described below
commit 7bd95afc6cf162d2d043bc9313c6665c641c0841
Author: mikhail <[email protected]>
AuthorDate: Thu Sep 10 15:52:50 2020 -0700
[NO ISSUE][RT] Randomized Sampling
-user model changes: no
-storage formate changes: no
-interface changes: no
Details:
- LocalSamplingAggregateDescriptor now takes
random samples instead of first 100
Change-Id: I8889f49b127773d90c6ef8a9f09d8993a4da68ac
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4223
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../std/LocalSamplingAggregateDescriptor.java | 44 +++++++++++++++-------
1 file changed, 31 insertions(+), 13 deletions(-)
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
index 5e0f0d9..51431d3 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
@@ -19,6 +19,8 @@
package org.apache.asterix.runtime.aggregates.std;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -42,7 +44,6 @@ import
org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -97,7 +98,9 @@ public class LocalSamplingAggregateDescriptor extends
AbstractAggregateFunctionD
private final IPointable inputFieldValue = new VoidPointable();
private final int numSamplesRequired;
private final IScalarEvaluator[] sampledFieldsEval;
- private int numSamples;
+ private final ArrayList<ArrayBackedValueStorage> samples = new
ArrayList<>();
+ private final Random random = new Random();
+ private int count = 0;
/**
* @param args the fields that constitute a sample, e.g., $$1, $$2
@@ -119,10 +122,9 @@ public class LocalSamplingAggregateDescriptor extends
AbstractAggregateFunctionD
@Override
public void init() throws HyracksDataException {
- numSamples = 0;
+ samples.clear();
+ count = 0;
rangeMapBits.reset();
- // write a dummy integer at the beginning to be filled later with
the actual number of samples taken
- IntegerSerializerDeserializer.write(0,
rangeMapBits.getDataOutput());
}
/**
@@ -132,15 +134,27 @@ public class LocalSamplingAggregateDescriptor extends
AbstractAggregateFunctionD
*/
@Override
public void step(IFrameTupleReference tuple) throws
HyracksDataException {
- if (numSamples >= numSamplesRequired) {
+ count++;
+ if (samples.size() < numSamplesRequired) {
+ ArrayBackedValueStorage sampleStorage = new
ArrayBackedValueStorage();
+ writeTupleKey(tuple, sampleStorage);
+ samples.add(sampleStorage);
return;
}
- for (int i = 0; i < sampledFieldsEval.length; i++) {
- sampledFieldsEval[i].evaluate(tuple, inputFieldValue);
-
IntegerSerializerDeserializer.write(inputFieldValue.getLength(),
rangeMapBits.getDataOutput());
- rangeMapBits.append(inputFieldValue);
+ int swap = random.nextInt(count);
+ if (swap < numSamplesRequired) {
+ writeTupleKey(tuple, samples.get(swap));
+ }
+ }
+
+ private void writeTupleKey(IFrameTupleReference tuple,
ArrayBackedValueStorage storage)
+ throws HyracksDataException {
+ storage.reset();
+ for (IScalarEvaluator iScalarEvaluator : sampledFieldsEval) {
+ iScalarEvaluator.evaluate(tuple, inputFieldValue);
+
IntegerSerializerDeserializer.write(inputFieldValue.getLength(),
storage.getDataOutput());
+ storage.append(inputFieldValue);
}
- numSamples++;
}
/**
@@ -151,7 +165,7 @@ public class LocalSamplingAggregateDescriptor extends
AbstractAggregateFunctionD
@Override
public void finish(IPointable result) throws HyracksDataException {
storage.reset();
- if (numSamples == 0) {
+ if (samples.size() == 0) {
// empty partition? then send system null as an indication of
empty partition.
try {
storage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
@@ -160,7 +174,11 @@ public class LocalSamplingAggregateDescriptor extends
AbstractAggregateFunctionD
throw HyracksDataException.create(e);
}
} else {
- IntegerPointable.setInteger(rangeMapBits.getByteArray(),
rangeMapBits.getStartOffset(), numSamples);
+ rangeMapBits.reset();
+ IntegerSerializerDeserializer.write(samples.size(),
rangeMapBits.getDataOutput());
+ for (ArrayBackedValueStorage sample : samples) {
+ rangeMapBits.append(sample);
+ }
binary.setValue(rangeMapBits.getByteArray(),
rangeMapBits.getStartOffset(), rangeMapBits.getLength());
binarySerde.serialize(binary, storage.getDataOutput());
result.set(storage);