[ 
https://issues.apache.org/jira/browse/BEAM-8468?focusedWorklogId=335777&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335777
 ]

ASF GitHub Bot logged work on BEAM-8468:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Oct/19 21:34
            Start Date: 29/Oct/19 21:34
    Worklog Time Spent: 10m 
      Work Description: 11moon11 commented on pull request #9863: [BEAM-8468] 
Predicate push down for in memory table
URL: https://github.com/apache/beam/pull/9863#discussion_r340337011
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
 ##########
 @@ -213,6 +242,121 @@ public boolean supportsProjects() {
     public Schema getSchema() {
       return tableWithRows.table.getSchema();
     }
+
+    /**
+     * A helper method to create a {@code Filter} from {@code RexNode}.
+     *
+     * @param node {@code RexNode} to create a filter from.
+     * @return {@code Filter} PTransform.
+     */
+    private PTransform<PCollection<Row>, PCollection<Row>> 
filterFromNode(RexNode node) {
+      List<RexNode> operands = new ArrayList<>();
+      List<Integer> fieldIds = new ArrayList<>();
+      List<RexLiteral> literals = new ArrayList<>();
+      List<RexInputRef> inputRefs = new ArrayList<>();
+
+      if (node instanceof RexCall) {
+        operands.addAll(((RexCall) node).getOperands());
+      } else if (node instanceof RexInputRef) {
+        operands.add(node);
+        operands.add(RexLiteral.fromJdbcString(node.getType(), 
SqlTypeName.BOOLEAN, "true"));
+      } else {
+        throw new RuntimeException(
+            "Was expecting a RexCall or a boolean RexInputRef, but received: "
+                + node.getClass().getSimpleName());
+      }
+
+      for (RexNode operand : operands) {
+        if (operand instanceof RexInputRef) {
+          RexInputRef inputRef = (RexInputRef) operand;
+          fieldIds.add(inputRef.getIndex());
+          inputRefs.add(inputRef);
+        } else if (operand instanceof RexLiteral) {
+          RexLiteral literal = (RexLiteral) operand;
+          literals.add(literal);
+        } else {
+          throw new RuntimeException(
+              "Encountered an unexpected operand: " + 
operand.getClass().getSimpleName());
+        }
+      }
+
+      SerializableFunction<Integer, Boolean> comparison;
+      // TODO: add support for expressions like:
+      //  =(CAST($3):INTEGER NOT NULL, 200)
+      switch (node.getKind()) {
+        case LESS_THAN:
+          comparison = i -> i < 0;
+          break;
+        case GREATER_THAN:
+          comparison = i -> i > 0;
+          break;
+        case LESS_THAN_OR_EQUAL:
+          comparison = i -> i <= 0;
+          break;
+        case GREATER_THAN_OR_EQUAL:
+          comparison = i -> i >= 0;
+          break;
+        case EQUALS:
+        case INPUT_REF:
+          comparison = i -> i == 0;
+          break;
+        case NOT_EQUALS:
+          comparison = i -> i != 0;
+          break;
+        default:
+          throw new RuntimeException("Unsupported node kind: " + 
node.getKind().toString());
+      }
+
+      return Filter.<Row>create()
+          .whereFieldIds(
+              fieldIds, createFilter(operands, fieldIds, inputRefs, literals, 
comparison));
+    }
+
+    /**
+     * A helper method to create a serializable function comparing row fields.
+     *
+     * @param operands A list of operands used in a comparison.
+     * @param fieldIds A list of operand ids.
+     * @param inputRefs A list of operands, which are an instanceof {@code 
RexInputRef}.
+     * @param literals A list of operands, which are an instanceof {@code 
RexLiteral}.
+     * @param comparison A comparison to perform between operands.
+     * @return A filter comparing row fields to literals/other fields.
+     */
+    private SerializableFunction<Row, Boolean> createFilter(
+        List<RexNode> operands,
+        List<Integer> fieldIds,
+        List<RexInputRef> inputRefs,
+        List<RexLiteral> literals,
+        SerializableFunction<Integer, Boolean> comparison) {
+      if (inputRefs.size() == 2) { // Comparing 2 columns.
+        final int op0 = fieldIds.indexOf(inputRefs.get(0).getIndex());
+        final int op1 = fieldIds.indexOf(inputRefs.get(1).getIndex());
+        return row -> 
comparison.apply(row.<Comparable>getValue(op0).compareTo(op1));
+      } else { // Comparing a column to a literal.
 
 Review comment:
   Removed `else`.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 335777)
    Time Spent: 2h 20m  (was: 2h 10m)

> Add predicate/filter push-down capability to IO APIs
> ----------------------------------------------------
>
>                 Key: BEAM-8468
>                 URL: https://issues.apache.org/jira/browse/BEAM-8468
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Kirill Kozlov
>            Assignee: Kirill Kozlov
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> * InMemoryTable should implement a following methods:
> {code:java}
> public PCollection<Row> buildIOReader(
>     PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames);
> public BeamSqlTableFilter constructFilter(List<RexNode> filter);
> {code}
>  * Update a push-down rule to support predicate/filter push-down.
>  * Create a class
> {code:java}
> class TestTableFilter implements BeamSqlTableFilter{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to