rdblue commented on a change in pull request #1893:
URL: https://github.com/apache/iceberg/pull/1893#discussion_r559001995



##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -75,32 +92,584 @@ public void clean() {
 
   @Test
   public void testLimitPushDown() {
-    sql("INSERT INTO %s  VALUES (1,'a'),(2,'b')", TABLE_NAME);
-
     String querySql = String.format("SELECT * FROM %s LIMIT 1", TABLE_NAME);
     String explain = getTableEnv().explainSql(querySql);
     String expectedExplain = "LimitPushDown : 1";
-    Assert.assertTrue("explain should contains LimitPushDown", 
explain.contains(expectedExplain));
+    Assert.assertTrue("Explain should contain LimitPushDown", 
explain.contains(expectedExplain));
     List<Object[]> result = sql(querySql);
-    Assert.assertEquals("should have 1 record", 1, result.size());
-    Assert.assertArrayEquals("Should produce the expected records", 
result.get(0), new Object[] {1, "a"});
+    Assert.assertEquals("Should have 1 record", 1, result.size());
+    Assert.assertArrayEquals("Should produce the expected records", new 
Object[] {1, "iceberg", 10.0}, result.get(0));
 
     AssertHelpers.assertThrows("Invalid limit number: -1 ", 
SqlParserException.class,
         () -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME));
 
-    Assert.assertEquals("should have 0 record", 0, sql("SELECT * FROM %s LIMIT 
0", TABLE_NAME).size());
+    Assert.assertEquals("Should have 0 record", 0, sql("SELECT * FROM %s LIMIT 
0", TABLE_NAME).size());
 
-    String sqlLimitExceed = String.format("SELECT * FROM %s LIMIT 3", 
TABLE_NAME);
+    String sqlLimitExceed = String.format("SELECT * FROM %s LIMIT 4", 
TABLE_NAME);
     List<Object[]> resultExceed = sql(sqlLimitExceed);
-    Assert.assertEquals("should have 2 record", 2, resultExceed.size());
+    Assert.assertEquals("Should have 3 record", 3, resultExceed.size());
     List<Object[]> expectedList = Lists.newArrayList();
-    expectedList.add(new Object[] {1, "a"});
-    expectedList.add(new Object[] {2, "b"});
-    Assert.assertArrayEquals("Should produce the expected records", 
resultExceed.toArray(), expectedList.toArray());
+    expectedList.add(new Object[] {1, "iceberg", 10.0});
+    expectedList.add(new Object[] {2, "b", 20.0});
+    expectedList.add(new Object[] {3, null, 30.0});
+    Assert.assertArrayEquals("Should produce the expected records", 
expectedList.toArray(), resultExceed.toArray());
 
     String sqlMixed = String.format("SELECT * FROM %s WHERE id = 1 LIMIT 2", 
TABLE_NAME);
     List<Object[]> mixedResult = sql(sqlMixed);
-    Assert.assertEquals("should have 1 record", 1, mixedResult.size());
-    Assert.assertArrayEquals("Should produce the expected records", 
mixedResult.get(0), new Object[] {1, "a"});
+    Assert.assertEquals("Should have 1 record", 1, mixedResult.size());
+    Assert.assertArrayEquals("Should produce the expected records",
+        new Object[] {1, "iceberg", 10.0}, mixedResult.get(0));
+  }
+
+  @Test
+  public void testNoFilterPushDown() {
+    String sql = String.format("SELECT * FROM %s ", TABLE_NAME);
+    String explain = getTableEnv().explainSql(sql);
+    Assert.assertFalse("Explain should not contain FilterPushDown", 
explain.contains(expectedFilterPushDownExplain));
+    Assert.assertNull("Should not push down a filter", lastScanEvent);
+  }
+
+  @Test
+  public void testFilterPushDownEqual() {
+    String sqlLiteralRight = String.format("SELECT * FROM %s WHERE id = 1 ", 
TABLE_NAME);
+    String explain = getTableEnv().explainSql(sqlLiteralRight);
+    String expectedFilter = "ref(name=\"id\") == 1";
+    Assert.assertTrue("Explain should contain the push down filter", 
explain.contains(expectedFilter));
+
+    List<Object[]> result = sql(sqlLiteralRight);
+    Assert.assertEquals("Should have 1 record", 1, result.size());
+    Assert.assertArrayEquals("Should produce the expected record", new 
Object[] {1, "iceberg", 10.0}, result.get(0));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownEqualNull() {
+    String sqlEqualNull = String.format("SELECT * FROM %s WHERE data = NULL ", 
TABLE_NAME);
+    String explainEqualNull = getTableEnv().explainSql(sqlEqualNull);
+    Assert.assertFalse("Explain should not contain FilterPushDown",
+        explainEqualNull.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> result = sql(sqlEqualNull);
+    Assert.assertEquals("Should have 0 record", 0, result.size());
+    Assert.assertNull("Should not push down a filter", lastScanEvent);
+  }
+
+  @Test
+  public void testFilterPushDownEqualLiteralOnLeft() {
+    String sqlLiteralLeft = String.format("SELECT * FROM %s WHERE 1 = id ", 
TABLE_NAME);
+    String explainLeft = getTableEnv().explainSql(sqlLiteralLeft);
+    String expectedFilter = "ref(name=\"id\") == 1";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainLeft.contains(expectedFilter));
+
+    List<Object[]> resultLeft = sql(sqlLiteralLeft);
+    Assert.assertEquals("Should have 1 record", 1, resultLeft.size());
+    Assert
+        .assertArrayEquals("Should produce the expected record", new Object[] 
{1, "iceberg", 10.0}, resultLeft.get(0));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownNoEqual() {
+    String sqlNE = String.format("SELECT * FROM %s WHERE id <> 1 ", 
TABLE_NAME);
+    String explainNE = getTableEnv().explainSql(sqlNE);
+    String expectedFilter = "ref(name=\"id\") != 1";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainNE.contains(expectedFilter));
+
+    List<Object[]> resultNE = sql(sqlNE);
+    Assert.assertEquals("Should have 2 records", 2, resultNE.size());
+
+    List<Object[]> expectedNE = Lists.newArrayList();
+    expectedNE.add(new Object[] {2, "b", 20.0});
+    expectedNE.add(new Object[] {3, null, 30.0});
+    Assert.assertArrayEquals("Should produce the expected record", 
expectedNE.toArray(), resultNE.toArray());
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownNoEqualNull() {
+    String sqlNotEqualNull = String.format("SELECT * FROM %s WHERE data <> 
NULL ", TABLE_NAME);
+    String explainNotEqualNull = getTableEnv().explainSql(sqlNotEqualNull);
+    Assert.assertFalse("Explain should not contain FilterPushDown", 
explainNotEqualNull.contains(
+        expectedFilterPushDownExplain));
+
+    List<Object[]> resultNE = sql(sqlNotEqualNull);
+    Assert.assertEquals("Should have 0 records", 0, resultNE.size());
+    Assert.assertNull("Should not push down a filter", lastScanEvent);
+  }
+
+  @Test
+  public void testFilterPushDownAnd() {
+    String sqlAnd = String.format("SELECT * FROM %s WHERE id = 1 AND data = 
'iceberg' ", TABLE_NAME);
+    String explainAnd = getTableEnv().explainSql(sqlAnd);
+    String expectedFilter = "ref(name=\"id\") == 1,ref(name=\"data\") == 
\"iceberg\"";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainAnd.contains(expectedFilter));
+
+    List<Object[]> resultAnd = sql(sqlAnd);
+    Assert.assertEquals("Should have 1 record", 1, resultAnd.size());
+    Assert.assertArrayEquals("Should produce the expected record", new 
Object[] {1, "iceberg", 10.0}, resultAnd.get(0));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    String expected = "(ref(name=\"id\") == 1 and ref(name=\"data\") == 
\"iceberg\")";
+    Assert.assertEquals("Should contain the push down filter", expected, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownOr() {
+    String sqlOr = String.format("SELECT * FROM %s WHERE id = 1 OR data = 'b' 
", TABLE_NAME);
+    String explainOr = getTableEnv().explainSql(sqlOr);
+    String expectedFilter = "(ref(name=\"id\") == 1 or ref(name=\"data\") == 
\"b\")";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainOr.contains(expectedFilter));
+
+    List<Object[]> resultOr = sql(sqlOr);
+    Assert.assertEquals("Should have 2 record", 2, resultOr.size());
+
+    List<Object[]> expectedOR = Lists.newArrayList();
+    expectedOR.add(new Object[] {1, "iceberg", 10.0});
+    expectedOR.add(new Object[] {2, "b", 20.0});
+    Assert.assertArrayEquals("Should produce the expected record", 
expectedOR.toArray(), resultOr.toArray());
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownGreaterThan() {
+    String sqlGT = String.format("SELECT * FROM %s WHERE id > 1 ", TABLE_NAME);
+    String explainGT = getTableEnv().explainSql(sqlGT);
+    String expectedFilter = "ref(name=\"id\") > 1";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainGT.contains(expectedFilter));
+
+    List<Object[]> resultGT = sql(sqlGT);
+    Assert.assertEquals("Should have 2 record", 2, resultGT.size());
+
+    List<Object[]> expectedGT = Lists.newArrayList();
+    expectedGT.add(new Object[] {2, "b", 20.0});
+    expectedGT.add(new Object[] {3, null, 30.0});
+    Assert.assertArrayEquals("Should produce the expected record", 
expectedGT.toArray(), resultGT.toArray());
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownGreaterThanNull() {
+    String sqlGT = String.format("SELECT * FROM %s WHERE data > null ", 
TABLE_NAME);
+    String explainGT = getTableEnv().explainSql(sqlGT);
+    Assert.assertFalse("Explain should not contain FilterPushDown", 
explainGT.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> resultGT = sql(sqlGT);
+    Assert.assertEquals("Should have 0 record", 0, resultGT.size());
+    Assert.assertNull("Should not push down a filter", lastScanEvent);
+  }
+
+  @Test
+  public void testFilterPushDownGreaterThanLiteralOnLeft() {
+    String sqlGT = String.format("SELECT * FROM %s WHERE 3 > id ", TABLE_NAME);
+    String explainGT = getTableEnv().explainSql(sqlGT);
+    String expectedFilter = "ref(name=\"id\") < 3";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainGT.contains(expectedFilter));
+
+    List<Object[]> resultGT = sql(sqlGT);
+    Assert.assertEquals("Should have 2 records", 2, resultGT.size());
+
+    List<Object[]> expectedGT = Lists.newArrayList();
+    expectedGT.add(new Object[] {1, "iceberg", 10.0});
+    expectedGT.add(new Object[] {2, "b", 20.0});
+    Assert.assertArrayEquals("Should produce the expected record", 
expectedGT.toArray(), resultGT.toArray());
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownGreaterThanEqual() {
+    String sqlGTE = String.format("SELECT * FROM %s WHERE id >= 2 ", 
TABLE_NAME);
+    String explainGTE = getTableEnv().explainSql(sqlGTE);
+    String expectedFilter = "ref(name=\"id\") >= 2";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainGTE.contains(expectedFilter));
+
+    List<Object[]> resultGTE = sql(sqlGTE);
+    Assert.assertEquals("Should have 2 records", 2, resultGTE.size());
+
+    List<Object[]> expectedGTE = Lists.newArrayList();
+    expectedGTE.add(new Object[] {2, "b", 20.0});
+    expectedGTE.add(new Object[] {3, null, 30.0});
+    Assert.assertArrayEquals("Should produce the expected record", 
expectedGTE.toArray(), resultGTE.toArray());
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownGreaterThanEqualNull() {
+    String sqlGTE = String.format("SELECT * FROM %s WHERE data >= null ", 
TABLE_NAME);
+    String explainGTE = getTableEnv().explainSql(sqlGTE);
+    Assert.assertFalse("Explain should not contain FilterPushDown", 
explainGTE.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> resultGT = sql(sqlGTE);
+    Assert.assertEquals("Should have 0 record", 0, resultGT.size());
+    Assert.assertNull("Should not push down a filter", lastScanEvent);
+  }
+
+  @Test
+  public void testFilterPushDownGreaterThanEqualLiteralOnLeft() {
+    String sqlGTE = String.format("SELECT * FROM %s WHERE 2 >= id ", 
TABLE_NAME);
+    String explainGTE = getTableEnv().explainSql(sqlGTE);
+    String expectedFilter = "ref(name=\"id\") <= 2";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainGTE.contains(expectedFilter));
+
+    List<Object[]> resultGTE = sql(sqlGTE);
+    Assert.assertEquals("Should have 2 records", 2, resultGTE.size());
+
+    List<Object[]> expectedGTE = Lists.newArrayList();
+    expectedGTE.add(new Object[] {1, "iceberg", 10.0});
+    expectedGTE.add(new Object[] {2, "b", 20.0});
+    Assert.assertArrayEquals("Should produce the expected record", 
expectedGTE.toArray(), resultGTE.toArray());
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownLessThan() {
+    String sqlLT = String.format("SELECT * FROM %s WHERE id < 2 ", TABLE_NAME);
+    String explainLT = getTableEnv().explainSql(sqlLT);
+    String expectedFilter = "ref(name=\"id\") < 2";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainLT.contains(expectedFilter));
+
+    List<Object[]> resultLT = sql(sqlLT);
+    Assert.assertEquals("Should have 1 record", 1, resultLT.size());
+    Assert.assertArrayEquals("Should produce the expected record", new 
Object[] {1, "iceberg", 10.0}, resultLT.get(0));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownLessThanNull() {
+    String sqlLT = String.format("SELECT * FROM %s WHERE data < null ", 
TABLE_NAME);
+    String explainLT = getTableEnv().explainSql(sqlLT);
+    Assert.assertFalse("Explain should not contain FilterPushDown", 
explainLT.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> resultGT = sql(sqlLT);
+    Assert.assertEquals("Should have 0 record", 0, resultGT.size());
+    Assert.assertNull("Should not push down a filter", lastScanEvent);
+  }
+
+  @Test
+  public void testFilterPushDownLessThanLiteralOnLeft() {
+    String sqlLT = String.format("SELECT * FROM %s WHERE 2 < id ", TABLE_NAME);
+    String explainLT = getTableEnv().explainSql(sqlLT);
+    String expectedFilter = "ref(name=\"id\") > 2";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainLT.contains(expectedFilter));
+
+    List<Object[]> resultLT = sql(sqlLT);
+    Assert.assertEquals("Should have 1 record", 1, resultLT.size());
+    Assert.assertArrayEquals("Should produce the expected record", new 
Object[] {3, null, 30.0}, resultLT.get(0));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownLessThanEqual() {
+    String sqlLTE = String.format("SELECT * FROM %s WHERE id <= 1 ", 
TABLE_NAME);
+    String explainLTE = getTableEnv().explainSql(sqlLTE);
+    String expectedFilter = "ref(name=\"id\") <= 1";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainLTE.contains(expectedFilter));
+
+    List<Object[]> resultLTE = sql(sqlLTE);
+    Assert.assertEquals("Should have 1 record", 1, resultLTE.size());
+    Assert.assertArrayEquals("Should produce the expected record", new 
Object[] {1, "iceberg", 10.0}, resultLTE.get(0));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownLessThanEqualNull() {
+    String sqlLTE = String.format("SELECT * FROM %s WHERE data <= null ", 
TABLE_NAME);
+    String explainLTE = getTableEnv().explainSql(sqlLTE);
+    Assert.assertFalse("Explain should not contain FilterPushDown", 
explainLTE.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> resultGT = sql(sqlLTE);
+    Assert.assertEquals("Should have 0 record", 0, resultGT.size());
+    Assert.assertNull("Should not push down a filter", lastScanEvent);
+  }
+
+  @Test
+  public void testFilterPushDownLessThanEqualLiteralOnLeft() {
+    String sqlLTE = String.format("SELECT * FROM %s WHERE 3 <= id  ", 
TABLE_NAME);
+    String explainLTE = getTableEnv().explainSql(sqlLTE);
+    String expectedFilter = "ref(name=\"id\") >= 3";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainLTE.contains(expectedFilter));
+
+    List<Object[]> resultLTE = sql(sqlLTE);
+    Assert.assertEquals("Should have 1 record", 1, resultLTE.size());
+    Assert.assertArrayEquals("Should produce the expected record", new 
Object[] {3, null, 30.0}, resultLTE.get(0));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownIn() {
+    String sqlIN = String.format("SELECT * FROM %s WHERE id IN (1,2) ", 
TABLE_NAME);
+    String explainIN = getTableEnv().explainSql(sqlIN);
+    String expectedFilter = "(ref(name=\"id\") == 1 or ref(name=\"id\") == 2)";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainIN.contains(expectedFilter));
+    List<Object[]> resultIN = sql(sqlIN);
+    Assert.assertEquals("Should have 2 records", 2, resultIN.size());
+
+    List<Object[]> expectedIN = Lists.newArrayList();
+    expectedIN.add(new Object[] {1, "iceberg", 10.0});
+    expectedIN.add(new Object[] {2, "b", 20.0});
+    Assert.assertArrayEquals("Should produce the expected record", 
expectedIN.toArray(), resultIN.toArray());
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownInNull() {
+    String sqlInNull = String.format("SELECT * FROM %s WHERE data IN 
('iceberg',NULL) ", TABLE_NAME);
+    String explainInNull = getTableEnv().explainSql(sqlInNull);
+    Assert.assertFalse("Explain should not contain FilterPushDown",
+        explainInNull.contains(expectedFilterPushDownExplain));
+
+    List<Object[]> result = sql(sqlInNull);
+    Assert.assertEquals("Should have 1 record", 1, result.size());
+    Assert.assertArrayEquals("Should produce the expected record", new 
Object[] {1, "iceberg", 10.0}, result.get(0));
+    Assert.assertEquals("Should not push down a filter", 
Expressions.alwaysTrue(), lastScanEvent.filter());
+  }
+
+  @Test
+  public void testFilterPushDownNotIn() {
+    String sqlNotIn = String.format("SELECT * FROM %s WHERE id NOT IN (3,2) ", 
TABLE_NAME);
+    String explainNotIn = getTableEnv().explainSql(sqlNotIn);
+    String expectedFilter = "ref(name=\"id\") != 3,ref(name=\"id\") != 2";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainNotIn.contains(expectedFilter));
+
+    List<Object[]> resultNotIn = sql(sqlNotIn);
+    Assert.assertEquals("Should have 1 record", 1, resultNotIn.size());
+    Assert
+        .assertArrayEquals("Should produce the expected record", new Object[] 
{1, "iceberg", 10.0}, resultNotIn.get(0));
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    String expectedScan = "(ref(name=\"id\") != 3 and ref(name=\"id\") != 2)";
+    Assert.assertEquals("Should contain the push down filter", expectedScan, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownNotInNull() {
+    String sqlNotInNull = String.format("SELECT * FROM %s WHERE id NOT IN 
(1,2,NULL) ", TABLE_NAME);
+    String explainNotInNull = getTableEnv().explainSql(sqlNotInNull);
+    Assert.assertFalse("Explain should not contain FilterPushDown",
+        explainNotInNull.contains(expectedFilterPushDownExplain));
+    List<Object[]> resultGT = sql(sqlNotInNull);
+    Assert.assertEquals("Should have 0 record", 0, resultGT.size());
+    Assert.assertEquals("Should not push down a filter", 
Expressions.alwaysTrue(), lastScanEvent.filter());
+  }
+
+  @Test
+  public void testFilterPushDownIsNotNull() {
+    String sqlNotNull = String.format("SELECT * FROM %s WHERE data IS NOT 
NULL", TABLE_NAME);
+    String explainNotNull = getTableEnv().explainSql(sqlNotNull);
+    String expectedFilter = "not_null(ref(name=\"data\"))";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainNotNull.contains(expectedFilter));
+
+    List<Object[]> resultNotNull = sql(sqlNotNull);
+    Assert.assertEquals("Should have 2 record", 2, resultNotNull.size());
+
+    List<Object[]> expected = Lists.newArrayList();
+    expected.add(new Object[] {1, "iceberg", 10.0});
+    expected.add(new Object[] {2, "b", 20.0});
+    Assert.assertArrayEquals("Should produce the expected record", 
expected.toArray(), resultNotNull.toArray());
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownIsNull() {
+    String sqlNull = String.format("SELECT * FROM %s WHERE data IS  NULL", 
TABLE_NAME);
+    String explainNull = getTableEnv().explainSql(sqlNull);
+    String expectedFilter = "is_null(ref(name=\"data\"))";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainNull.contains(expectedFilter));
+
+    List<Object[]> resultNull = sql(sqlNull);
+    Assert.assertEquals("Should have 1 record", 1, resultNull.size());
+    Assert.assertArrayEquals("Should produce the expected record", new 
Object[] {3, null, 30.0}, resultNull.get(0));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownNot() {
+    String sqlNot = String.format("SELECT * FROM %s WHERE NOT (id = 1 OR id = 
2 ) ", TABLE_NAME);
+    String explainNot = getTableEnv().explainSql(sqlNot);
+    String expectedFilter = "ref(name=\"id\") != 1,ref(name=\"id\") != 2";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainNot.contains(expectedFilter));
+
+    List<Object[]> resultNot = sql(sqlNot);
+    Assert.assertEquals("Should have 1 record", 1, resultNot.size());
+    Assert.assertArrayEquals("Should produce the expected record", new 
Object[] {3, null, 30.0}, resultNot.get(0));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    expectedFilter = "(ref(name=\"id\") != 1 and ref(name=\"id\") != 2)";
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownBetween() {
+    String sqlBetween = String.format("SELECT * FROM %s WHERE id BETWEEN 1 AND 
2 ", TABLE_NAME);
+    String explainBetween = getTableEnv().explainSql(sqlBetween);
+    String expectedFilter = "ref(name=\"id\") >= 1,ref(name=\"id\") <= 2";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainBetween.contains(expectedFilter));
+
+    List<Object[]> resultBetween = sql(sqlBetween);
+    Assert.assertEquals("Should have 2 record", 2, resultBetween.size());
+
+    List<Object[]> expectedBetween = Lists.newArrayList();
+    expectedBetween.add(new Object[] {1, "iceberg", 10.0});
+    expectedBetween.add(new Object[] {2, "b", 20.0});
+    Assert.assertArrayEquals("Should produce the expected record", 
expectedBetween.toArray(), resultBetween.toArray());
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    String expected = "(ref(name=\"id\") >= 1 and ref(name=\"id\") <= 2)";
+    Assert.assertEquals("Should contain the push down filter", expected, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownNotBetween() {
+    String sqlNotBetween = String.format("SELECT * FROM %s WHERE id  NOT 
BETWEEN 2 AND 3 ", TABLE_NAME);
+    String explainNotBetween = getTableEnv().explainSql(sqlNotBetween);
+    String expectedFilter = "(ref(name=\"id\") < 2 or ref(name=\"id\") > 3)";
+    Assert.assertTrue("Explain should contain the push down filter", 
explainNotBetween.contains(expectedFilter));
+
+    List<Object[]> resultNotBetween = sql(sqlNotBetween);
+    Assert.assertEquals("Should have 1 record", 1, resultNotBetween.size());
+    Assert.assertArrayEquals("the not between Should produce the expected 
record", resultNotBetween.get(0),
+        new Object[] {1, "iceberg", 10.0});
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterPushDownLike() {
+    String sqlLike = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE 'ice%' 
";
+    String explainLike = getTableEnv().explainSql(sqlLike);
+    String expectedFilter = "ref(name=\"data\") startsWith \"\"ice\"\"";
+    Assert.assertTrue("the like sql Explain should contain the push down 
filter", explainLike.contains(expectedFilter));
+
+    sqlLike = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE 'ice%%' ";
+    List<Object[]> resultLike = sql(sqlLike);
+    Assert.assertEquals("Should have 1 record", 1, resultLike.size());
+    Assert.assertArrayEquals("The like result should produce the expected 
record",
+        new Object[] {1, "iceberg", 10.0}, resultLike.get(0));
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should contain the push down filter", expectedFilter, 
lastScanEvent.filter().toString());
+  }
+
+  @Test
+  public void testFilterNotPushDownLike() {
+    String sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE 
'%i' ";
+    String explainNoPushDown = getTableEnv().explainSql(sqlNoPushDown);
+    Assert.assertFalse("Explain should not contain FilterPushDown",
+        explainNoPushDown.contains(expectedFilterPushDownExplain));
+    sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%i' ";
+    List<Object[]> resultLike = sql(sqlNoPushDown);
+    Assert.assertEquals("Should have 1 record", 0, resultLike.size());
+    Assert.assertEquals("Should not push down a filter", 
Expressions.alwaysTrue(), lastScanEvent.filter());
+
+    sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%i%' ";
+    explainNoPushDown = getTableEnv().explainSql(sqlNoPushDown);
+    Assert.assertFalse("Explain should not contain FilterPushDown",
+        explainNoPushDown.contains(expectedFilterPushDownExplain));
+    sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%i%%' 
";
+    resultLike = sql(sqlNoPushDown);
+    Assert.assertEquals("Should have 1 record", 1, resultLike.size());
+    Assert
+        .assertArrayEquals("Should produce the expected record", new Object[] 
{1, "iceberg", 10.0}, resultLike.get(0));
+    Assert.assertEquals("Should not push down a filter", 
Expressions.alwaysTrue(), lastScanEvent.filter());
+
+    sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE 
'%ice%g' ";
+    explainNoPushDown = getTableEnv().explainSql(sqlNoPushDown);
+    Assert.assertFalse("Explain should not contain FilterPushDown",
+        explainNoPushDown.contains(expectedFilterPushDownExplain));
+    sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE 
'%%ice%%g' ";
+    resultLike = sql(sqlNoPushDown);
+    Assert.assertEquals("Should have 1 record", 1, resultLike.size());
+    Assert
+        .assertArrayEquals("Should produce the expected record", new Object[] 
{1, "iceberg", 10.0}, resultLike.get(0));
+    Assert.assertEquals("Should not push down a filter", 
Expressions.alwaysTrue(), lastScanEvent.filter());
+
+    sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE '%' ";
+    explainNoPushDown = getTableEnv().explainSql(sqlNoPushDown);
+    Assert.assertFalse("Explain should not contain FilterPushDown",
+        explainNoPushDown.contains(expectedFilterPushDownExplain));
+    sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE '%%' ";
+    resultLike = sql(sqlNoPushDown);
+    Assert.assertEquals("Should have 2 records", 2, resultLike.size());
+    List<Object[]> expectedRecords = Lists.newArrayList();
+    expectedRecords.add(new Object[] {1, "iceberg", 10.0});
+    expectedRecords.add(new Object[] {2, "b", 20.0});
+    Assert.assertArrayEquals("Should produce the expected record", 
expectedRecords.toArray(), resultLike.toArray());
+    Assert.assertEquals("Should not push down a filter", 
Expressions.alwaysTrue(), lastScanEvent.filter());
+
+    sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE 
'iceber_' ";
+    explainNoPushDown = getTableEnv().explainSql(sqlNoPushDown);
+    Assert.assertFalse("Explain should not contain FilterPushDown",
+        explainNoPushDown.contains(expectedFilterPushDownExplain));
+    resultLike = sql(sqlNoPushDown);
+    Assert.assertEquals("Should have 1 record", 1, resultLike.size());
+    Assert
+        .assertArrayEquals("Should produce the expected record", new Object[] 
{1, "iceberg", 10.0}, resultLike.get(0));
+    Assert.assertEquals("Should not push down a filter", 
Expressions.alwaysTrue(), lastScanEvent.filter());
+  }
+
+  @Test
+  public void testFilterPushDown2Literal() {
+    String sql2Literal = String.format("SELECT * FROM %s WHERE 1 > 0 ", 
TABLE_NAME);
+    String explain2Literal = getTableEnv().explainSql(sql2Literal);
+    Assert.assertFalse("Explain should not contain FilterPushDown",
+        explain2Literal.contains(expectedFilterPushDownExplain));
+    Assert.assertNull("Should not push down a filter", lastScanEvent);
+  }
+
+  /**
+   * NaN is not supported by flink now, so we add the test case to assert the 
parse error, when we upgrade the flink
+   * that supports NaN, we will delele the method, and add some test case to 
test NaN.
+   */
+  @Test
+  public void testSqlParseError() {
+    String sqlParseErrorEqual = String.format("SELECT * FROM %s WHERE d = 
DOUBLE('NaN') ", TABLE_NAME);

Review comment:
       I don't see the function `DOUBLE` documented in Flink's built-in 
functions. I think what you want is to use `CAST` instead: `CAST('NaN' AS 
DOUBLE)`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to