openinx commented on a change in pull request #1893:
URL: https://github.com/apache/iceberg/pull/1893#discussion_r551206609



##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -75,32 +95,328 @@ public void clean() {
 
   @Test
   public void testLimitPushDown() {
-    sql("INSERT INTO %s  VALUES (1,'a'),(2,'b')", TABLE_NAME);
-
     String querySql = String.format("SELECT * FROM %s LIMIT 1", TABLE_NAME);
     String explain = getTableEnv().explainSql(querySql);
     String expectedExplain = "LimitPushDown : 1";
     Assert.assertTrue("explain should contains LimitPushDown", 
explain.contains(expectedExplain));
     List<Object[]> result = sql(querySql);
-    Assert.assertEquals("should have 1 record", 1, result.size());
-    Assert.assertArrayEquals("Should produce the expected records", 
result.get(0), new Object[] {1, "a"});
+    assertEquals("should have 1 record", 1, result.size());
+    assertArrayEquals("Should produce the expected records", result.get(0), 
new Object[] {1, "a"});
 
     AssertHelpers.assertThrows("Invalid limit number: -1 ", 
SqlParserException.class,
         () -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME));
 
-    Assert.assertEquals("should have 0 record", 0, sql("SELECT * FROM %s LIMIT 
0", TABLE_NAME).size());
+    assertEquals("should have 0 record", 0, sql("SELECT * FROM %s LIMIT 0", 
TABLE_NAME).size());
 
-    String sqlLimitExceed = String.format("SELECT * FROM %s LIMIT 3", 
TABLE_NAME);
+    String sqlLimitExceed = String.format("SELECT * FROM %s LIMIT 4", 
TABLE_NAME);
     List<Object[]> resultExceed = sql(sqlLimitExceed);
-    Assert.assertEquals("should have 2 record", 2, resultExceed.size());
+    assertEquals("should have 3 record", 3, resultExceed.size());
     List<Object[]> expectedList = Lists.newArrayList();
     expectedList.add(new Object[] {1, "a"});
     expectedList.add(new Object[] {2, "b"});
-    Assert.assertArrayEquals("Should produce the expected records", 
resultExceed.toArray(), expectedList.toArray());
+    expectedList.add(new Object[] {3, null});
+    assertArrayEquals("Should produce the expected records", 
resultExceed.toArray(), expectedList.toArray());
 
     String sqlMixed = String.format("SELECT * FROM %s WHERE id = 1 LIMIT 2", 
TABLE_NAME);
     List<Object[]> mixedResult = sql(sqlMixed);
-    Assert.assertEquals("should have 1 record", 1, mixedResult.size());
-    Assert.assertArrayEquals("Should produce the expected records", 
mixedResult.get(0), new Object[] {1, "a"});
+    assertEquals("should have 1 record", 1, mixedResult.size());
+    assertArrayEquals("Should produce the expected records", 
mixedResult.get(0), new Object[] {1, "a"});
+  }
+
+  @Test
+  public void testNoFilterPushDown() {
+    String sql = String.format("SELECT * FROM %s ", TABLE_NAME);
+    String explain = getTableEnv().explainSql(sql);
+    assertFalse("explain should no contains FilterPushDown", 
explain.contains(expectedFilterPushDownExplain));
+  }
+
+  @Test
+  public void testFilterPushDownEqual() {
+    String sqlLiteralRight = String.format("SELECT * FROM %s WHERE id = 1 ", 
TABLE_NAME);
+    String explain = getTableEnv().explainSql(sqlLiteralRight);
+    assertTrue("explain should contains FilterPushDown", 
explain.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> result = sql(sqlLiteralRight);
+    assertEquals("should have 1 record", 1, result.size());
+    assertArrayEquals("Should produce the expected record", result.get(0), new 
Object[] {1, "a"});
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "id = 1", 
FlinkUtil.describe(lastScanEvent.filter()));
+
+    // filter not push down
+    String sqlEqualNull = String.format("SELECT * FROM %s WHERE data = NULL ", 
TABLE_NAME);
+    String explainEqualNull = getTableEnv().explainSql(sqlEqualNull);
+    assertFalse("explain should not contains FilterPushDown", 
explainEqualNull.contains(expectedFilterPushDownExplain));
+  }
+
+  @Test
+  public void testFilterPushDownEqualLiteralOnLeft() {
+    String sqlLiteralLeft = String.format("SELECT * FROM %s WHERE 1 = id ", 
TABLE_NAME);
+    String explainLeft = getTableEnv().explainSql(sqlLiteralLeft);
+    assertTrue("explain should contains FilterPushDown", 
explainLeft.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> resultLeft = sql(sqlLiteralLeft);
+    assertEquals("should have 1 record", 1, resultLeft.size());
+    assertArrayEquals("Should produce the expected record", resultLeft.get(0), 
new Object[] {1, "a"});
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "id = 1", 
FlinkUtil.describe(lastScanEvent.filter()));
+  }
+
+  @Test
+  public void testFilterPushDownNoEqual() {
+    String sqlNE = String.format("SELECT * FROM %s WHERE id <> 1 ", 
TABLE_NAME);
+    String explainNE = getTableEnv().explainSql(sqlNE);
+    assertTrue("explain should contains FilterPushDown", 
explainNE.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> resultNE = sql(sqlNE);
+    assertEquals("should have 2 record", 2, resultNE.size());
+    List<Object[]> expectedNE = Lists.newArrayList();
+    expectedNE.add(new Object[] {2, "b"});
+    expectedNE.add(new Object[] {3, null});
+    assertArrayEquals("Should produce the expected record", 
resultNE.toArray(), expectedNE.toArray());
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "id != 1", 
FlinkUtil.describe(lastScanEvent.filter()));
+
+    String sqlNotEqualNull = String.format("SELECT * FROM %s WHERE data <> 
NULL ", TABLE_NAME);
+    String explainNotEqualNull = getTableEnv().explainSql(sqlNotEqualNull);
+    assertFalse("explain should not contains FilterPushDown", 
explainNotEqualNull.contains(
+        expectedFilterPushDownExplain));
+  }
+
+  @Test
+  public void testFilterPushDownAnd() {
+    String sqlAnd = String.format("SELECT * FROM %s WHERE id = 1 AND data = 
'a' ", TABLE_NAME);
+    String explainAnd = getTableEnv().explainSql(sqlAnd);
+    assertTrue("explain should contains FilterPushDown", 
explainAnd.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> resultAnd = sql(sqlAnd);
+    assertEquals("should have 1 record", 1, resultAnd.size());
+    assertArrayEquals("Should produce the expected record", resultAnd.get(0), 
new Object[] {1, "a"});
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "(id = 1 AND data 
= 'a')",
+        FlinkUtil.describe(lastScanEvent.filter()));
+  }
+
+  @Test
+  public void testFilterPushDownOr() {
+    String sqlOr = String.format("SELECT * FROM %s WHERE id = 1 OR data = 'b' 
", TABLE_NAME);
+    String explainOr = getTableEnv().explainSql(sqlOr);
+    assertTrue("explain should contains FilterPushDown", 
explainOr.contains(expectedFilterPushDownExplain));
+    List<Object[]> resultOr = sql(sqlOr);
+    assertEquals("should have 2 record", 2, resultOr.size());
+
+    List<Object[]> expectedOR = Lists.newArrayList();
+    expectedOR.add(new Object[] {1, "a"});
+    expectedOR.add(new Object[] {2, "b"});
+    assertArrayEquals("Should produce the expected record", 
resultOr.toArray(), expectedOR.toArray());
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "(id = 1 OR data = 
'b')",
+        FlinkUtil.describe(lastScanEvent.filter()));
+  }
+
+  @Test
+  public void testFilterPushDownGreaterThan() {
+    String sqlGT = String.format("SELECT * FROM %s WHERE id > 1 ", TABLE_NAME);
+    String explainGT = getTableEnv().explainSql(sqlGT);
+    assertTrue("explain should contains FilterPushDown", 
explainGT.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> resultGT = sql(sqlGT);
+    assertEquals("should have 2 record", 2, resultGT.size());
+    List<Object[]> expectedGT = Lists.newArrayList();
+    expectedGT.add(new Object[] {2, "b"});
+    expectedGT.add(new Object[] {3, null});
+    assertArrayEquals("Should produce the expected record", 
resultGT.toArray(), expectedGT.toArray());
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "id > 1", 
FlinkUtil.describe(lastScanEvent.filter()));
+  }
+
+  @Test
+  public void testFilterPushDownGreaterThanEqual() {
+    String sqlGTE = String.format("SELECT * FROM %s WHERE id >= 2 ", 
TABLE_NAME);
+    String explainGTE = getTableEnv().explainSql(sqlGTE);
+    assertTrue("explain should contains FilterPushDown", 
explainGTE.contains(expectedFilterPushDownExplain));
+    List<Object[]> resultGTE = sql(sqlGTE);
+    assertEquals("should have 2 records", 2, resultGTE.size());
+
+    List<Object[]> expectedGTE = Lists.newArrayList();
+    expectedGTE.add(new Object[] {2, "b"});
+    expectedGTE.add(new Object[] {3, null});
+    assertArrayEquals("Should produce the expected record", 
resultGTE.toArray(), expectedGTE.toArray());
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "id >= 2", 
FlinkUtil.describe(lastScanEvent.filter()));
+  }
+
+  @Test
+  public void testFilterPushDownLessThan() {
+    String sqlLT = String.format("SELECT * FROM %s WHERE id < 2 ", TABLE_NAME);
+    String explainLT = getTableEnv().explainSql(sqlLT);
+    assertTrue("explain should contains FilterPushDown", 
explainLT.contains(expectedFilterPushDownExplain));
+    List<Object[]> resultLT = sql(sqlLT);
+    assertEquals("should have 1 record", 1, resultLT.size());
+    assertArrayEquals("Should produce the expected record", resultLT.get(0), 
new Object[] {1, "a"});
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "id < 2", 
FlinkUtil.describe(lastScanEvent.filter()));
+  }
+
+  @Test
+  public void testFilterPushDownLessThanEqual() {
+    String sqlLTE = String.format("SELECT * FROM %s WHERE id <= 1 ", 
TABLE_NAME);
+    String explainLTE = getTableEnv().explainSql(sqlLTE);
+    assertTrue("explain should contains FilterPushDown", 
explainLTE.contains(expectedFilterPushDownExplain));
+    List<Object[]> resultLTE = sql(sqlLTE);
+    assertEquals("should have 1 record", 1, resultLTE.size());
+    assertArrayEquals("Should produce the expected record", resultLTE.get(0), 
new Object[] {1, "a"});
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "id <= 1", 
FlinkUtil.describe(lastScanEvent.filter()));
+  }
+
+  @Test
+  public void testFilterPushDownIn() {
+    String sqlIN = String.format("SELECT * FROM %s WHERE id IN (1,2) ", 
TABLE_NAME);
+    String explainIN = getTableEnv().explainSql(sqlIN);
+    assertTrue("explain should contains FilterPushDown", 
explainIN.contains(expectedFilterPushDownExplain));
+    List<Object[]> resultIN = sql(sqlIN);
+    assertEquals("should have 2 records", 2, resultIN.size());
+    List<Object[]> expectedIN = Lists.newArrayList();
+    expectedIN.add(new Object[] {1, "a"});
+    expectedIN.add(new Object[] {2, "b"});
+    assertArrayEquals("Should produce the expected record", 
resultIN.toArray(), expectedIN.toArray());
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "(id = 1 OR id = 
2)",
+        FlinkUtil.describe(lastScanEvent.filter()));
+
+    // in with null will not push down
+    String sqlInNull = String.format("SELECT * FROM %s WHERE id IN (1,2,NULL) 
", TABLE_NAME);
+    String explainInNull = getTableEnv().explainSql(sqlInNull);
+    assertFalse("explain should not contains FilterPushDown", 
explainInNull.contains(expectedFilterPushDownExplain));
+  }
+
+  @Test
+  public void testFilterPushDownNotIn() {
+    String sqlNotIn = String.format("SELECT * FROM %s WHERE id NOT IN (3,2) ", 
TABLE_NAME);
+    String explainNotIn = getTableEnv().explainSql(sqlNotIn);
+    assertTrue("explain should contains FilterPushDown", 
explainNotIn.contains(expectedFilterPushDownExplain));
+    List<Object[]> resultNotIn = sql(sqlNotIn);
+    assertEquals("should have 1 record", 1, resultNotIn.size());
+    assertArrayEquals("Should produce the expected record", 
resultNotIn.get(0), new Object[] {1, "a"});
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "(id != 3 AND id 
!= 2)",
+        FlinkUtil.describe(lastScanEvent.filter()));
+
+    String sqlNotInNull = String.format("SELECT * FROM %s WHERE id NOT IN 
(1,2,NULL) ", TABLE_NAME);
+    String explainNotInNull = getTableEnv().explainSql(sqlNotInNull);
+    assertFalse("explain should not contains FilterPushDown", 
explainNotInNull.contains(expectedFilterPushDownExplain));
+  }
+
+  @Test
+  public void testFilterPushDownIsNotNull() {
+    String sqlNotNull = String.format("SELECT * FROM %s WHERE data IS NOT 
NULL", TABLE_NAME);
+    String explainNotNull = getTableEnv().explainSql(sqlNotNull);
+    assertTrue("explain should contains FilterPushDown", 
explainNotNull.contains(expectedFilterPushDownExplain));
+    List<Object[]> resultNotNull = sql(sqlNotNull);
+    assertEquals("should have 2 record", 2, resultNotNull.size());
+    List<Object[]> expected = Lists.newArrayList();
+    expected.add(new Object[] {1, "a"});
+    expected.add(new Object[] {2, "b"});
+    assertArrayEquals("Should produce the expected record", 
resultNotNull.toArray(), expected.toArray());
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "data IS NOT NULL",
+        FlinkUtil.describe(lastScanEvent.filter()));
+  }
+
+  @Test
+  public void testFilterPushDownIsNull() {
+    String sqlNull = String.format("SELECT * FROM %s WHERE data IS  NULL", 
TABLE_NAME);
+    String explainNull = getTableEnv().explainSql(sqlNull);
+    assertTrue("explain should contains FilterPushDown", 
explainNull.contains(expectedFilterPushDownExplain));
+    List<Object[]> resultNull = sql(sqlNull);
+    assertEquals("should have 1 record", 1, resultNull.size());
+    assertArrayEquals("Should produce the expected record", resultNull.get(0), 
new Object[] {3, null});
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    assertEquals("Should push down expected filter", "data IS NULL", 
FlinkUtil.describe(lastScanEvent.filter()));
+  }
+
+  @Test
+  public void testFilterPushDownNot() {
+    String sqlNot = String.format("SELECT * FROM %s WHERE NOT id = 1 ", 
TABLE_NAME);
+    String explainNot = getTableEnv().explainSql(sqlNot);
+    assertTrue("explain should contains FilterPushDown", 
explainNot.contains(expectedFilterPushDownExplain));
+    List<Object[]> resultNot = sql(sqlNot);
+    assertEquals("should have 2 record", 2, resultNot.size());
+    List<Object[]> expectedNot = Lists.newArrayList();
+    expectedNot.add(new Object[] {2, "b"});
+    expectedNot.add(new Object[] {3, null});
+    assertArrayEquals("Should produce the expected record", 
resultNot.toArray(), expectedNot.toArray());
+
+    assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should push down expected filter", "id != 1", 
FlinkUtil.describe(lastScanEvent.filter()));
+  }
+
+  @Test
+  public void testFilterPushDownBetween() {
+    String sqlBetween = String.format("SELECT * FROM %s WHERE id BETWEEN 1 AND 
2 ", TABLE_NAME);
+    String explainBetween = getTableEnv().explainSql(sqlBetween);
+    assertTrue("explain should contains FilterPushDown", 
explainBetween.contains(expectedFilterPushDownExplain));

Review comment:
       How about changing this to check whether the `explainBetween` contains a 
more detailed string `FilterPushDown,the filters :ref(name="id") >= 
1,ref(name="id") <= 2]]], fields=[id, data])` ?   
`explainBetween.contains("FilterPushDown")` is not so accurate for me. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to