Repository: asterixdb
Updated Branches:
  refs/heads/master e7fa4b3fb -> d906bd89e


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d906bd89/asterixdb/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/similarity/SimilarityFiltersJaccard.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/similarity/SimilarityFiltersJaccard.java
 
b/asterixdb/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/similarity/SimilarityFiltersJaccard.java
index 556c0b7..007e7dc 100644
--- 
a/asterixdb/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/similarity/SimilarityFiltersJaccard.java
+++ 
b/asterixdb/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/similarity/SimilarityFiltersJaccard.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.asterix.fuzzyjoin.similarity;
 
 import java.util.Arrays;
@@ -29,9 +28,6 @@ public class SimilarityFiltersJaccard implements 
SimilarityFilters {
         public int lengthR;
         public int hamming;
 
-        public Partition() {
-        }
-
         public Partition(int startL, int lengthL, int startR, int lengthR, int 
hamming) {
             this.startL = startL;
             this.lengthL = lengthL;
@@ -41,45 +37,58 @@ public class SimilarityFiltersJaccard implements 
SimilarityFilters {
         }
     }
 
-    /**
-     *
-     */
     private static final long serialVersionUID = 1L;
 
     private static final int MAX_DEPTH = 2;
 
-    public static int getLengthLowerBound(int length, float simThr) {
-        return (int) Math.ceil(simThr * length);
-    }
+    // Note here that, this may be unsafe when the string length is larger 
than 100.
+    public static final double EPSILON = 0.000001;
 
-    public static boolean passLengthFilter(int lengthX, int lengthY, float 
simThr) {
-        return getLengthLowerBound(lengthX, simThr) <= lengthY && lengthY <= 1 
/ simThr * lengthX;
+    public static int getLengthLowerBound(int length, double simThr) {
+        return safeCeilingDouble(simThr * length);
     }
 
-    protected float simThr;
+    /* Length filter derived from Jaccard(lengthX, lengthY) >= simThr
+     * 1. lengthX >= lengthY * simThr, or equivalently, lengthY <= 1 / simThr 
* lengthX
+     * 2. lengthY >= lengthX * simThr
+     * */
+    public static boolean passLengthFilter(int lengthX, int lengthY, double 
simThr) {
+        return getLengthLowerBound(lengthX, simThr) <= lengthY
+                && (lengthY < 1 / simThr * lengthX || Math.abs(lengthY - 1 / 
simThr * lengthX) < EPSILON);
+    }
 
-    protected float simThr100;
+    protected double simThr;
 
     public SimilarityFiltersJaccard(float similarityThreshold) {
         reset(similarityThreshold);
     }
 
     public int getIndexPrefixLength(int length) {
-        return length - (int) Math.ceil(2 * simThr100 / (100 + simThr100) * 
length) + 1;
+        return length - safeCeilingDouble(2 * simThr / (1 + simThr) * length) 
+ 1;
+    }
+
+    private static int safeCeilingDouble(double d) {
+        if (Math.abs(d - Math.floor(d)) < EPSILON) {
+            return (int) Math.floor(d);
+        } else {
+            return (int) Math.ceil(d);
+        }
     }
 
     public int getIntersectLowerBound(int lengthX, int lengthY) {
-        return (int) Math.ceil(simThr100 * (lengthX + lengthY) / (100 + 
simThr100));
+        return safeCeilingDouble(simThr * (lengthX + lengthY) / (1 + simThr));
     }
 
     public int getIntersectUpperBound(int noGramsCommon, int positionX, int 
positionY, int lengthX, int lengthY) {
         return noGramsCommon + Math.min(lengthX - positionX - 1, lengthY - 
positionY - 1);
     }
 
+    @Override
     public int getLengthLowerBound(int length) {
         return getLengthLowerBound(length, simThr);
     }
 
+    @Override
     public int getLengthUpperBound(int length) {
         return (int) Math.floor(1 / simThr * length);
     }
@@ -89,7 +98,7 @@ public class SimilarityFiltersJaccard implements 
SimilarityFilters {
         if (tokens[posL] > w) {
             p = posL;
         } else if (tokens[posR] < w) {
-            p = posR;
+            p = posR + 1;
         } else {
             p = Arrays.binarySearch(tokens, start, start + length, w);
         }
@@ -104,14 +113,15 @@ public class SimilarityFiltersJaccard implements 
SimilarityFilters {
         return new Partition(start, p - start, p, start + length - p, 1);
     }
 
+    @Override
     public int getPrefixLength(int length) {
         if (length == 0) {
             return 0;
         }
-        return length - (int) Math.ceil(simThr * length) + 1;
+        return length - safeCeilingDouble(simThr * length) + 1;
     }
 
-    public float getSimilarityThreshold() {
+    public double getSimilarityThreshold() {
         return simThr;
     }
 
@@ -137,15 +147,6 @@ public class SimilarityFiltersJaccard implements 
SimilarityFilters {
         }
         Partition partitionY = new Partition(startY, mid - startY, mid + 1, 
startY + lengthY - mid - 1, 0);
 
-        // Partition partitionX = getPartition(tokensX, startX, lengthX,
-        // tokensY[mid], Math.max(Math.min(mid + startX - startY, startX
-        // + lengthX - 1)
-        // - offset - Math.abs(lengthX - lengthY) * offsetL,
-        // startX), Math.min(Math.max(mid + startX - startY,
-        // startX)
-        // + offset + Math.abs(lengthX - lengthY) * offsetR,
-        // startX + lengthX - 1));
-
         Partition partitionX = getPartition(tokensX, startX, lengthX, 
tokensY[mid],
                 Math.max(mid + startX - startY - offset - lengthDiff * 
offsetL, startX),
                 Math.min(mid + startX - startY + offset + lengthDiff * 
offsetR, startX + lengthX - 1));
@@ -170,26 +171,26 @@ public class SimilarityFiltersJaccard implements 
SimilarityFilters {
         return hamming;
     }
 
+    @Override
     public boolean passLengthFilter(int lengthX, int lengthY) {
         return passLengthFilter(lengthX, lengthY, simThr);
     }
 
     /**
-     * @param noGramsCommon
-     *            number of grams in common
-     * @param positionX
-     *            position of the last gram in common on X
-     * @param positionY
-     *            position of the last gram in common on X
-     * @param lengthX
-     * @param lengthY
+     * @param noGramsCommon number of grams in common
+     * @param positionX     position of the last gram in common on X
+     * @param positionY     position of the last gram in common on X
+     * @param lengthX       total length of X
+     * @param lengthY       total length of Y
      * @return
      */
+    @Override
     public boolean passPositionFilter(int noGramsCommon, int positionX, int 
lengthX, int positionY, int lengthY) {
         return getIntersectUpperBound(noGramsCommon, positionX, positionY, 
lengthX,
                 lengthY) >= getIntersectLowerBound(lengthX, lengthY);
     }
 
+    @Override
     public float passSimilarityFilter(final int[] tokensX, int startX, int 
lengthX, final int prefixLengthX,
             final int[] tokensY, int startY, int lengthY, final int 
prefixLengthY, final int intersectionSizePrefix) {
         final int length = lengthX;
@@ -206,12 +207,18 @@ public class SimilarityFiltersJaccard implements 
SimilarityFilters {
                         + SimilarityMetric.getIntersectSize(tokensX, startX + 
prefixLengthX, lengthX - prefixLengthX,
                                 tokensY, startY + intersectionSizePrefix, 
lengthY - intersectionSizePrefix);
             }
-        } else {
+        } else if (token > tokenProbe) {
             if (intersectionSizePrefix + lengthProbe - prefixLengthY >= 
intersectSizeLowerBound) {
                 intersectSize = intersectionSizePrefix + 
SimilarityMetric.getIntersectSize(tokensX,
                         startX + intersectionSizePrefix, lengthX - 
intersectionSizePrefix, tokensY,
                         startY + prefixLengthY, lengthY - prefixLengthY);
             }
+        } else {
+            if (intersectionSizePrefix + lengthProbe - prefixLengthY >= 
intersectSizeLowerBound) {
+                intersectSize =
+                        intersectionSizePrefix + 
SimilarityMetric.getIntersectSize(tokensX, startX + prefixLengthX,
+                                lengthX - prefixLengthX, tokensY, startY + 
prefixLengthY, lengthY - prefixLengthY);
+            }
         }
 
         if (intersectSize >= intersectSizeLowerBound) {
@@ -221,77 +228,37 @@ public class SimilarityFiltersJaccard implements 
SimilarityFilters {
     }
 
     /**
-     * @param tokensX
-     * @param prefixLengthX
-     * @param tokensY
-     * @param prefixLengthY
+     * @param tokensX                ordered list of the tokens in X
+     * @param prefixLengthX          prefix length of x derived from prefix 
filter based on simThr
+     * @param tokensY                ordered list of the tokens in Y
+     * @param prefixLengthY          prefix length of Y derived from prefix 
filter based on simThr
      * @param intersectionSizePrefix
-     * @return similarity if it is above or equal to the similarity threshold, 0
-     *         otherwise
+     * @return similarity if it is above or equal to the similarity threshold, 
0 otherwise
      */
+    @Override
     public float passSimilarityFilter(final int[] tokensX, final int 
prefixLengthX, final int[] tokensY,
             final int prefixLengthY, final int intersectionSizePrefix) {
-        // final int length = tokensX.length;
-        // final int token = tokensX[Math.min(prefixLengthX, tokensX.length) -
-        // 1];
-        // final int lengthProbe = tokensY.length;
-        // final int tokenProbe = tokensY[prefixLengthY - 1];
-        //
-        // final int intersectSizeLowerBound = getIntersectLowerBound(length,
-        // lengthProbe);
-        // int intersectSize = 0;
-        //
-        // if (token < tokenProbe) {
-        // if (intersectionSizePrefix + length - prefixLengthX >=
-        // intersectSizeLowerBound) {
-        // intersectSize = intersectionSizePrefix
-        // + SimilarityMetric.getIntersectSize(tokensX,
-        // prefixLengthX, tokensY, intersectionSizePrefix);
-        // }
-        // } else {
-        // if (intersectionSizePrefix + lengthProbe - prefixLengthY >=
-        // intersectSizeLowerBound) {
-        // intersectSize = intersectionSizePrefix
-        // + SimilarityMetric.getIntersectSize(tokensX,
-        // intersectionSizePrefix, tokensY, prefixLengthY);
-        // }
-        // }
-        //
-        // if (intersectSize >= intersectSizeLowerBound) {
-        // return ((float) intersectSize)
-        // / (length + lengthProbe - intersectSize);
-        // }
-        // return 0;
         return passSimilarityFilter(tokensX, 0, tokensX.length, prefixLengthX, 
tokensY, 0, tokensY.length,
                 prefixLengthY, intersectionSizePrefix);
     }
 
+    @Override
     public boolean passSuffixFilter(int[] tokensX, int tokensStartX, int 
tokensLengthX, int positionX, int[] tokensY,
             int tokensStartY, int tokensLengthY, int positionY) {
         int hammingMax = tokensLengthX + tokensLengthY
-                - 2 * (int) Math.ceil(simThr100 / (100 + simThr100) * 
(tokensLengthX + tokensLengthY))
+                - 2 * safeCeilingDouble(simThr / (1 + simThr) * (tokensLengthX 
+ tokensLengthY))
                 - (positionX + 1 + positionY + 1 - 2);
         int hamming = getSuffixFilter(tokensX, tokensStartX + positionX + 1, 
tokensLengthX - positionX - 1, tokensY,
                 tokensStartY + positionY + 1, tokensLengthY - positionY - 1, 
hammingMax, 1);
         return hamming <= hammingMax;
     }
 
+    @Override
     public boolean passSuffixFilter(int[] tokensX, int positionX, int[] 
tokensY, int positionY) {
-        // int hammingMax = tokensX.length
-        // + tokensY.length
-        // - 2
-        // * (int) Math.ceil(simThr100 / (100 + simThr100)
-        // * (tokensX.length + tokensY.length))
-        // - (positionX + 1 + positionY + 1 - 2);
-        // int hamming = getSuffixFilter(tokensX, positionX + 1, tokensX.length
-        // - positionX - 1, tokensY, positionY + 1, tokensY.length
-        // - positionY - 1, hammingMax, 1);
-        // return hamming <= hammingMax;
         return passSuffixFilter(tokensX, 0, tokensX.length, positionX, 
tokensY, 0, tokensY.length, positionY);
     }
 
     public void reset(float similarityThreshold) {
         simThr = similarityThreshold;
-        simThr100 = simThr * 100;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d906bd89/asterixdb/asterix-fuzzyjoin/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-fuzzyjoin/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
 
b/asterixdb/asterix-fuzzyjoin/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
index 23b9d50..6dd2c4d 100644
--- 
a/asterixdb/asterix-fuzzyjoin/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
+++ 
b/asterixdb/asterix-fuzzyjoin/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
@@ -25,7 +25,6 @@ import 
org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserial
 import 
org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import 
org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.fuzzyjoin.IntArray;
 import org.apache.asterix.fuzzyjoin.similarity.PartialIntersect;
 import org.apache.asterix.fuzzyjoin.similarity.SimilarityFiltersJaccard;
 import org.apache.asterix.fuzzyjoin.similarity.SimilarityMetric;
@@ -45,6 +44,7 @@ import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IntArray;
 
 public class SimilarityJaccardPrefixEvaluator implements IScalarEvaluator {
     // assuming type indicator in serde format
@@ -184,6 +184,7 @@ public class SimilarityJaccardPrefixEvaluator implements 
IScalarEvaluator {
                     tokens2.add(token);
                 }
             }
+
             // pad tokens
             for (; i < length2; i++) {
                 tokens2.add(Integer.MAX_VALUE);
@@ -193,25 +194,18 @@ public class SimilarityJaccardPrefixEvaluator implements 
IScalarEvaluator {
             evalTokenPrefix.evaluate(tuple, inputVal);
             int tokenPrefix = 
ATypeHierarchy.getIntegerValue(BuiltinFunctions.SIMILARITY_JACCARD.getName(), 4,
                     inputVal.getByteArray(), inputVal.getStartOffset());
-
             //
             // -- - position filter - --
             //
             SimilarityMetric.getPartialIntersectSize(tokens1.get(), 0, 
tokens1.length(), tokens2.get(), 0,
                     tokens2.length(), tokenPrefix, parInter);
-            if (similarityFilters.passPositionFilter(parInter.intersectSize, 
parInter.posXStop, length1,
-                    parInter.posYStop, length2)) {
 
-                //
-                // -- - suffix filter - --
-                //
-                if (similarityFilters.passSuffixFilter(tokens1.get(), 0, 
tokens1.length(), parInter.posXStart,
-                        tokens2.get(), 0, tokens2.length(), 
parInter.posYStart)) {
-
-                    sim = 
similarityFilters.passSimilarityFilter(tokens1.get(), 0, tokens1.length(),
-                            parInter.posXStop + 1, tokens2.get(), 0, 
tokens2.length(), parInter.posYStop + 1,
-                            parInter.intersectSize);
-                }
+            if (similarityFilters.passPositionFilter(parInter.intersectSize, 
parInter.posXStop, length1,
+                    parInter.posYStop, length2)
+                    && similarityFilters.passSuffixFilter(tokens1.get(), 0, 
tokens1.length(), parInter.posXStart,
+                            tokens2.get(), 0, tokens2.length(), 
parInter.posYStart)) { // -- - suffix filter - --
+                sim = similarityFilters.passSimilarityFilter(tokens1.get(), 0, 
tokens1.length(), parInter.posXStop + 1,
+                        tokens2.get(), 0, tokens2.length(), parInter.posYStop 
+ 1, parInter.intersectSize);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d906bd89/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
index b86e8e9..6a2a333 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
@@ -18,6 +18,9 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -25,7 +28,10 @@ import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 
 public class IsomorphismUtilities {
 
@@ -72,4 +78,62 @@ public class IsomorphismUtilities {
         return true;
     }
 
+    // Return an operator that produced the given PK variable.
+    private static ILogicalOperator getOpThatProducesPK(ILogicalOperator 
rootOp, LogicalVariable pkVar)
+            throws AlgebricksException {
+        ILogicalOperator prodOp = null;
+        boolean produced;
+        for (Mutable<ILogicalOperator> opRef : rootOp.getInputs()) {
+            produced = false;
+            List<LogicalVariable> producedVars = new ArrayList<>();
+            VariableUtilities.getProducedVariables(opRef.getValue(), 
producedVars);
+            if (producedVars.contains(pkVar)) {
+                prodOp = opRef.getValue();
+                produced = true;
+            } else if (opRef.getValue().hasInputs()) {
+                prodOp = getOpThatProducesPK(opRef.getValue(), pkVar);
+                if (prodOp != null) {
+                    produced = true;
+                }
+            }
+            if (produced) {
+                break;
+            }
+        }
+        return prodOp;
+    }
+
+    // Merge the cases where different PKs are derived from the same DATASOURCE
+    public static void mergeHomogeneousPK(ILogicalOperator op, 
List<LogicalVariable> pkVars)
+            throws AlgebricksException {
+        Map<LogicalVariable, ILogicalOperator> varOpMap = new HashMap<>();
+        for (LogicalVariable pk : pkVars) {
+            ILogicalOperator mOp = getOpThatProducesPK(op, pk);
+            if (mOp == null || 
!mOp.getOperatorTag().equals(LogicalOperatorTag.DATASOURCESCAN)) {
+                throw new AlgebricksException("Illegal variable production.");
+            }
+            varOpMap.put(pk, mOp);
+        }
+        // Check the isomorphic variables in pkVars by DataSource, use 
variableMapping to store each isomorphic pair.
+        // For any isomorphic pair <$i, $j>, use $i that is close to the 
beginning of pkVars as key and let $j as value.
+        Map<LogicalVariable, LogicalVariable> variableMapping = new 
HashMap<>();
+        for (int i = 0; i < pkVars.size() - 1; i++) {
+            for (int j = i + 1; j < pkVars.size(); j++) {
+                IDataSource<?> leftSource = ((DataSourceScanOperator) 
(varOpMap.get(pkVars.get(i)))).getDataSource();
+                IDataSource<?> rightSource = ((DataSourceScanOperator) 
(varOpMap.get(pkVars.get(j)))).getDataSource();
+                if 
(leftSource.getId().toString().equals(rightSource.getId().toString())) {
+                    mapVariablesTopDown(varOpMap.get(pkVars.get(i)), 
varOpMap.get(pkVars.get(j)), variableMapping);
+                }
+            }
+        }
+        // Remove a key variable in pkVars if it has at least one isomorphic 
variable in variableMapping.
+        Iterator<LogicalVariable> itr = pkVars.iterator();
+        while (itr.hasNext()) {
+            LogicalVariable pk = itr.next();
+            if (variableMapping.containsKey(pk)) {
+                variableMapping.remove(pk);
+                itr.remove();
+            }
+        }
+    }
 }

Reply via email to