This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4946cf4d6ab0f502f065825e157c7cabde604fe1
Author: Jark Wu <[email protected]>
AuthorDate: Mon Jun 10 17:34:38 2019 +0800

    [FLINK-12708][table] Introduce InputFormatTableSource and make blink&flink 
planner support it
    
    1. Add a isBounded() interface to StreamTableSource, default returns false
    2. Introduce InputFormatTableSource extends StreamTableSource and isBounded 
always returns true
    3. InputFormatTableSource only exposes getInputFormat() interface
---
 .../flink/table/sources/BatchTableSource.java      |  3 +
 ...ableSource.java => InputFormatTableSource.java} | 30 +++++++---
 .../flink/table/sources/StreamTableSource.java     |  8 +++
 flink-table/flink-table-planner-blink/pom.xml      |  6 ++
 .../flink/table/api/BatchTableEnvironment.scala    | 69 ++++++++++++----------
 .../flink/table/api/StreamTableEnvironment.scala   | 67 +++++++++++----------
 .../physical/batch/BatchExecTableSourceScan.scala  | 13 ++--
 .../batch/BatchExecScanTableSourceRule.scala       |  6 +-
 .../table/plan/schema/BatchTableSourceTable.scala  | 62 -------------------
 .../table/plan/schema/StreamTableSourceTable.scala | 62 -------------------
 .../table/plan/schema/TableSourceSinkTable.scala   |  6 +-
 .../flink/table/plan/schema/TableSourceTable.scala | 34 +++++++++--
 .../flink/table/sources/BatchTableSource.scala     | 38 ------------
 .../flink/table/sources/StreamTableSource.scala    | 38 ------------
 .../plan/stream/sql/join/LookupJoinTest.scala      | 14 +----
 .../table/runtime/batch/sql/TableScanITCase.scala  | 13 ++--
 .../batch/sql/agg/WindowAggregateITCase.scala      | 20 +++----
 .../table/runtime/stream/sql/TableScanITCase.scala |  2 +-
 .../utils/InMemoryLookupableTableSource.scala      |  5 --
 .../apache/flink/table/util/TableTestBase.scala    | 43 ++++++--------
 .../apache/flink/table/util/testTableSources.scala | 10 ++--
 .../apache/flink/table/api/BatchTableEnvImpl.scala | 16 +++--
 .../flink/table/api/StreamTableEnvImpl.scala       |  6 +-
 .../plan/nodes/dataset/BatchTableSourceScan.scala  | 23 +++++++-
 .../rules/dataSet/BatchTableSourceScanRule.scala   |  2 +-
 .../api/validation/TableSourceValidationTest.scala | 23 +++++++-
 .../runtime/batch/table/TableSourceITCase.scala    | 33 +++++++++++
 .../flink/table/utils/testTableSources.scala       | 24 +++++++-
 28 files changed, 306 insertions(+), 370 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/BatchTableSource.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/BatchTableSource.java
index 75b421e..c266d87 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/BatchTableSource.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/BatchTableSource.java
@@ -24,7 +24,10 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 /** Defines an external batch table and provides access to its data.
  *
  * @param <T> Type of the {@link DataSet} created by this {@link TableSource}.
+ *
+ * @deprecated use {@link InputFormatTableSource} instead.
  */
+@Deprecated
 public interface BatchTableSource<T> extends TableSource<T> {
 
        /**
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/InputFormatTableSource.java
similarity index 56%
copy from 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java
copy to 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/InputFormatTableSource.java
index 67ed55f..79fc55a 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/InputFormatTableSource.java
@@ -18,20 +18,34 @@
 
 package org.apache.flink.table.sources;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
-/** Defines an external stream table and provides read access to its data.
+/**
+ * Defines an external bounded table and provides access to its data.
  *
- * @param <T> Type of the {@link DataStream} created by this {@link 
TableSource}.
+ * @param <T> Type of the bounded {@link InputFormat} created by this {@link 
TableSource}.
  */
-public interface StreamTableSource<T> extends TableSource<T> {
+@Experimental
+public abstract class InputFormatTableSource<T> implements 
StreamTableSource<T> {
 
        /**
-        * Returns the data of the table as a {@link DataStream}.
-        *
-        *<p>NOTE: This method is for internal use only for defining a {@link 
TableSource}.
-        *       Do not use it in Table API programs.
+        * Returns an {@link InputFormat} for reading the data of the table.
         */
-       DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
+       public abstract InputFormat<T, ?> getInputFormat();
+
+       /**
+        * Always returns true which indicates this is a bounded source.
+        */
+       @Override
+       public final boolean isBounded() {
+               return true;
+       }
+
+       @Override
+       public final DataStream<T> getDataStream(StreamExecutionEnvironment 
execEnv) {
+               return execEnv.createInput(getInputFormat(), getReturnType());
+       }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java
index 67ed55f..4ec5dac 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java
@@ -28,6 +28,14 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 public interface StreamTableSource<T> extends TableSource<T> {
 
        /**
+        * Returns true if this is a bounded source, false if this is an 
unbounded source.
+        * Default is unbounded for compatibility.
+        */
+       default boolean isBounded() {
+               return false;
+       }
+
+       /**
         * Returns the data of the table as a {@link DataStream}.
         *
         *<p>NOTE: This method is for internal use only for defining a {@link 
TableSource}.
diff --git a/flink-table/flink-table-planner-blink/pom.xml 
b/flink-table/flink-table-planner-blink/pom.xml
index 7715207..888c34c 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -96,6 +96,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                </dependency>
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 79ad59c..b8cfae7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -30,7 +30,7 @@ import 
org.apache.flink.table.plan.nodes.process.DAGProcessContext
 import 
org.apache.flink.table.plan.nodes.resource.batch.parallelism.BatchParallelismProcessor
 import 
org.apache.flink.table.plan.optimize.{BatchCommonSubGraphBasedOptimizer, 
Optimizer}
 import org.apache.flink.table.plan.reuse.DeadlockBreakupProcessor
-import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
TableSourceSinkTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.{TableSourceSinkTable, 
TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil}
 import org.apache.flink.table.sinks._
@@ -252,8 +252,8 @@ class BatchTableEnvironment(
   }
 
   /**
-    * Registers an internal [[BatchTableSource]] in this 
[[TableEnvironment]]'s catalog without
-    * name checking. Registered tables can be referenced in SQL queries.
+    * Registers an internal bounded [[StreamTableSource]] in this 
[[TableEnvironment]]'s catalog
+    * without name checking. Registered tables can be referenced in SQL 
queries.
     *
     * @param name        The name under which the [[TableSource]] is 
registered.
     * @param tableSource The [[TableSource]] to register.
@@ -265,40 +265,47 @@ class BatchTableEnvironment(
       statistic: FlinkStatistic,
       replace: Boolean = false): Unit = {
 
-    tableSource match {
+    def register(): Unit = {
+      // check if a table (source or sink) is registered
+      getTable(name) match {
+        // table source and/or sink is registered
+        case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable 
match {
 
-      // check for proper batch table source
-      case batchTableSource: BatchTableSource[_] =>
-        // check if a table (source or sink) is registered
-        getTable(name) match {
-
-          // table source and/or sink is registered
-          case Some(table: TableSourceSinkTable[_, _]) => 
table.tableSourceTable match {
-
-            // wrapper contains source
-            case Some(_: TableSourceTable[_]) if !replace =>
-              throw new TableException(s"Table '$name' already exists. " +
-                s"Please choose a different name.")
-
-            // wrapper contains only sink (not source)
-            case _ =>
-              val enrichedTable = new TableSourceSinkTable(
-                Some(new BatchTableSourceTable(batchTableSource, statistic)),
-                table.tableSinkTable)
-              replaceRegisteredTable(name, enrichedTable)
-          }
-
-          // no table is registered
+          // wrapper contains source
+          case Some(_: TableSourceTable[_]) if !replace =>
+            throw new TableException(s"Table '$name' already exists. " +
+              s"Please choose a different name.")
+
+          // wrapper contains only sink (not source)
           case _ =>
-            val newTable = new TableSourceSinkTable(
-              Some(new BatchTableSourceTable(batchTableSource, statistic)),
-              None)
-            registerTableInternal(name, newTable)
+            val enrichedTable = new TableSourceSinkTable(
+              Some(new TableSourceTable(tableSource, false, statistic)),
+              table.tableSinkTable)
+            replaceRegisteredTable(name, enrichedTable)
         }
 
+        // no table is registered
+        case _ =>
+          val newTable = new TableSourceSinkTable(
+            Some(new TableSourceTable(tableSource, false, statistic)),
+            None)
+          registerTableInternal(name, newTable)
+      }
+    }
+
+    tableSource match {
+
+      // check for proper batch table source
+      case boundedTableSource: StreamTableSource[_] if 
boundedTableSource.isBounded =>
+        register()
+
+      // a lookupable table source can also be registered in the env
+      case _: LookupableTableSource[_] =>
+        register()
+
       // not a batch table source
       case _ =>
-        throw new TableException("Only BatchTableSource can be " +
+        throw new TableException("Only LookupableTableSouce and 
BatchTableSource can be " +
           "registered in BatchTableEnvironment.")
     }
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 2f31ef3..c62ff18 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -37,13 +37,12 @@ import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil}
 import org.apache.flink.table.sinks.DataStreamTableSink
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{LookupableTableSource, 
StreamTableSource, TableSource}
 import org.apache.flink.table.types.{DataType, LogicalTypeDataTypeConverter}
 import org.apache.flink.table.types.logical.{LogicalType, RowType}
 import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
 import org.apache.flink.table.util.PlanUtil
-
 import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
 import org.apache.calcite.sql.SqlExplainLevel
 
@@ -507,41 +506,49 @@ abstract class StreamTableEnvironment(
     //  case _ => // ok
     //}
 
+    def register(): Unit = {
+      // register
+      getTable(name) match {
+
+        // check if a table (source or sink) is registered
+        case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable 
match {
+
+          // wrapper contains source
+          case Some(_: TableSourceTable[_]) if !replace =>
+            throw new TableException(s"Table '$name' already exists. " +
+              s"Please choose a different name.")
+
+          // wrapper contains only sink (not source)
+          case Some(_: TableSourceTable[_]) =>
+            val enrichedTable = new TableSourceSinkTable(
+              Some(new TableSourceTable(tableSource, true, statistic)),
+              table.tableSinkTable)
+            replaceRegisteredTable(name, enrichedTable)
+        }
+
+        // no table is registered
+        case _ =>
+          val newTable = new TableSourceSinkTable(
+            Some(new TableSourceTable(tableSource, true, statistic)),
+            None)
+          registerTableInternal(name, newTable)
+      }
+    }
+
     tableSource match {
 
       // check for proper stream table source
-      case streamTableSource: StreamTableSource[_] =>
-        // register
-        getTable(name) match {
-
-          // check if a table (source or sink) is registered
-          case Some(table: TableSourceSinkTable[_, _]) => 
table.tableSourceTable match {
-
-            // wrapper contains source
-            case Some(_: TableSourceTable[_]) if !replace =>
-              throw new TableException(s"Table '$name' already exists. " +
-                s"Please choose a different name.")
-
-            // wrapper contains only sink (not source)
-            case Some(_: StreamTableSourceTable[_]) =>
-              val enrichedTable = new TableSourceSinkTable(
-                Some(new StreamTableSourceTable(streamTableSource)),
-                table.tableSinkTable)
-              replaceRegisteredTable(name, enrichedTable)
-          }
+      case streamTableSource: StreamTableSource[_] if 
!streamTableSource.isBounded =>
+        register()
 
-          // no table is registered
-          case _ =>
-            val newTable = new TableSourceSinkTable(
-              Some(new StreamTableSourceTable(streamTableSource)),
-              None)
-            registerTableInternal(name, newTable)
-        }
+      // a lookupable table source can also be registered in the env
+      case _: LookupableTableSource[_] =>
+        register()
 
       // not a stream table source
       case _ =>
-        throw new TableException(
-          "Only StreamTableSource can be registered in StreamTableEnvironment")
+        throw new TableException("Only LookupableTableSource and unbounded 
StreamTableSource " +
+          "can be registered in StreamTableEnvironment")
     }
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index ce0a826..6ed27dd 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -34,7 +34,7 @@ import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, 
ExecNode}
 import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
 import org.apache.flink.table.plan.util.ScanUtil
-import org.apache.flink.table.sources.{BatchTableSource, TableSourceUtil}
+import org.apache.flink.table.sources.{StreamTableSource, TableSourceUtil}
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
@@ -47,7 +47,8 @@ import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataTy
 import scala.collection.JavaConversions._
 
 /**
-  * Batch physical RelNode to read data from an external source defined by a 
[[BatchTableSource]].
+  * Batch physical RelNode to read data from an external source defined by a
+  * bounded [[StreamTableSource]].
   */
 class BatchExecTableSourceScan(
     cluster: RelOptCluster,
@@ -87,8 +88,8 @@ class BatchExecTableSourceScan(
   override def translateToPlanInternal(
       tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
     val config = tableEnv.getConfig
-    val bts = tableSource.asInstanceOf[BatchTableSource[_]]
-    val inputTransform = 
bts.getBoundedStream(tableEnv.execEnv).getTransformation
+    val bts = tableSource.asInstanceOf[StreamTableSource[_]] // bounded table 
source
+    val inputTransform = bts.getDataStream(tableEnv.execEnv).getTransformation
     inputTransform.setParallelism(getResource.getParallelism)
 
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
@@ -141,13 +142,13 @@ class BatchExecTableSourceScan(
       ScanUtil.needsConversion(
         tableSource.getProducedDataType,
         TypeExtractor.createTypeInfo(
-          tableSource, classOf[BatchTableSource[_]], tableSource.getClass, 0)
+          tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0)
           .getTypeClass.asInstanceOf[Class[_]])
   }
 
   def getSourceTransformation(
       streamEnv: StreamExecutionEnvironment): StreamTransformation[_] = {
-    
tableSource.asInstanceOf[BatchTableSource[_]].getBoundedStream(streamEnv).getTransformation
+    
tableSource.asInstanceOf[StreamTableSource[_]].getDataStream(streamEnv).getTransformation
   }
 
   def getEstimatedRowCount: lang.Double = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala
index 8c3e698..3021c56 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan
 import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan
 import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable}
-import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.sources.StreamTableSource
 
 /**
   * Rule that converts [[FlinkLogicalTableSourceScan]] to 
[[BatchExecTableSourceScan]].
@@ -38,14 +38,14 @@ class BatchExecScanTableSourceRule
     FlinkConventions.BATCH_PHYSICAL,
     "BatchExecScanTableSourceRule") {
 
-  /** Rule must only match if TableScan targets a [[BatchTableSource]] */
+  /** Rule must only match if TableScan targets a bounded 
[[StreamTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
     val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
     dataSetTable match {
       case tst: TableSourceTable[_] =>
         tst.tableSource match {
-          case _: BatchTableSource[_] => true
+          case sts: StreamTableSource[_] => sts.isBounded
           case _ => false
         }
       case _ => false
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala
deleted file mode 100644
index d01d9e9..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.schema
-
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, TableSource, 
TableSourceUtil}
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-
-/**
-  * Class which implements the logic to convert a [[BatchTableSourceTable]] to 
Calcite Table
-  */
-class BatchTableSourceTable[T](
-    tableSource: BatchTableSource[T],
-    statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
-  extends TableSourceTable(tableSource, statistic) {
-
-  // TODO implements this
-  // TableSourceUtil.validateTableSource(tableSource)
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    TableSourceUtil.getRelDataType(
-      tableSource,
-      None,
-      streaming = false,
-      typeFactory.asInstanceOf[FlinkTypeFactory])
-  }
-
-  /**
-    * Creates a copy of this table, changing statistic.
-    *
-    * @param statistic A new FlinkStatistic.
-    * @return Copy of this table, substituting statistic.
-    */
-  override def copy(statistic: FlinkStatistic) = new 
BatchTableSourceTable(tableSource, statistic)
-
-  /**
-    * replace table source with the given one, and create a new table source 
table.
-    *
-    * @param tableSource tableSource to replace.
-    * @return new TableSourceTable
-    */
-  override def replaceTableSource(tableSource: TableSource[T]) =
-    new BatchTableSourceTable(tableSource.asInstanceOf[BatchTableSource[T]], 
statistic)
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
deleted file mode 100644
index 1dc9ae0..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.schema
-
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceUtil}
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-
-/**
-  * Class which implements the logic to convert a [[StreamTableSource]] to 
Calcite Table
-  */
-class StreamTableSourceTable[T](
-    tableSource: StreamTableSource[T],
-    statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
-  extends TableSourceTable(tableSource, statistic) {
-
-  // TODO implements this
-  // TableSourceUtil.validateTableSource(tableSource)
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    TableSourceUtil.getRelDataType(
-      tableSource,
-      None,
-      streaming = true,
-      typeFactory.asInstanceOf[FlinkTypeFactory])
-  }
-
-  /**
-    * Creates a copy of this table, changing statistic.
-    *
-    * @param statistic A new FlinkStatistic.
-    * @return Copy of this table, substituting statistic.
-    */
-  override def copy(statistic: FlinkStatistic) = new 
StreamTableSourceTable(tableSource, statistic)
-
-  /**
-    * replace table source with the given one, and create a new table source 
table.
-    *
-    * @param tableSource tableSource to replace.
-    * @return new TableSourceTable
-    */
-  override def replaceTableSource(tableSource: TableSource[T]) =
-    new StreamTableSourceTable(tableSource.asInstanceOf[StreamTableSource[T]], 
statistic)
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
index 0ed343f..c421d78 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
@@ -56,18 +56,18 @@ class TableSourceSinkTable[T1, T2](
   def isSourceTable: Boolean = tableSourceTable.isDefined
 
   def isStreamSourceTable: Boolean = tableSourceTable match {
-    case Some(_: StreamTableSourceTable[_]) => true
+    case Some(tst) => tst.isStreaming
     case _ => false
   }
 
   def isBatchSourceTable: Boolean = tableSourceTable match {
-    case Some(_: BatchTableSourceTable[_]) => true
+    case Some(tst) => !tst.isStreaming
     case _ => false
   }
 
   override def copy(statistic: FlinkStatistic): FlinkTable = {
     new TableSourceSinkTable[T1, T2](
-      tableSourceTable.map(source => 
source.copy(statistic).asInstanceOf[TableSourceTable[T1]]),
+      tableSourceTable.map(source => source.copy(statistic)),
       tableSinkTable.map(sink => 
sink.copy(statistic).asInstanceOf[TableSinkTable[T2]]))
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index fb6e3be..cafb2e9 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -18,18 +18,42 @@
 
 package org.apache.flink.table.plan.schema
 
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
 /**
   * Abstract class which define the interfaces required to convert a 
[[TableSource]] to
   * a Calcite Table
   */
-abstract class TableSourceTable[T](
+class TableSourceTable[T](
     val tableSource: TableSource[T],
-    val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+    val isStreaming: Boolean,
+    val statistic: FlinkStatistic)
   extends FlinkTable {
 
+  // TODO implements this
+  // TableSourceUtil.validateTableSource(tableSource)
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    TableSourceUtil.getRelDataType(
+      tableSource,
+      None,
+      streaming = false,
+      typeFactory.asInstanceOf[FlinkTypeFactory])
+  }
+
+  /**
+    * Creates a copy of this table, changing statistic.
+    *
+    * @param statistic A new FlinkStatistic.
+    * @return Copy of this table, substituting statistic.
+    */
+  override def copy(statistic: FlinkStatistic): TableSourceTable[T] = {
+    new TableSourceTable(tableSource, isStreaming, statistic)
+  }
+
   /**
     * Returns statistics of current table.
     */
@@ -41,5 +65,7 @@ abstract class TableSourceTable[T](
     * @param tableSource tableSource to replace.
     * @return new TableSourceTable
     */
-  def replaceTableSource(tableSource: TableSource[T]): TableSourceTable[T]
+  def replaceTableSource(tableSource: TableSource[T]): TableSourceTable[T] = {
+    new TableSourceTable(tableSource, isStreaming, statistic)
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
deleted file mode 100644
index bdd5641..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-/**
-  * Defines an external batch exec table and provides access to its data.
-  *
-  * @tparam T Type of the [[DataStream]] created by this [[TableSource]].
-  */
-trait BatchTableSource[T] extends TableSource[T] {
-
-  /**
-    * Returns the data of the table as a [[DataStream]].
-    *
-    * NOTE: This method is for internal use only for defining a 
[[TableSource]].
-    * Do not use it in Table API programs.
-    */
-  def getBoundedStream(streamEnv: StreamExecutionEnvironment): DataStream[T]
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
deleted file mode 100644
index 07d7fa0..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-/**
-  * Defines an external stream table and provides access to its data.
-  *
-  * @tparam T Type of the [[DataStream]] created by this [[TableSource]].
-  */
-trait StreamTableSource[T] extends TableSource[T] {
-
-  /**
-    * Returns the data of the table as a [[DataStream]].
-    *
-    * NOTE: This method is for internal use only for defining a 
[[TableSource]].
-    * Do not use it in Table API programs.
-    */
-  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index 2eb1cb1..9e7ce7b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -342,9 +342,7 @@ class LookupJoinTest extends TableTestBase with 
Serializable {
 
 
 class TestTemporalTable
-  extends StreamTableSource[BaseRow]
-  with BatchTableSource[BaseRow]
-  with LookupableTableSource[BaseRow]
+  extends LookupableTableSource[BaseRow]
   with DefinedIndexes {
 
   val fieldNames: Array[String] = Array("id", "name", "age")
@@ -381,16 +379,6 @@ class TestTemporalTable
       .build()
     util.Arrays.asList(index1, index2)
   }
-
-  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[BaseRow] = {
-    throw new UnsupportedOperationException("This TableSource is only used for 
unit test, " +
-      "this method should never be called.")
-  }
-
-  override def getBoundedStream(streamEnv: StreamExecutionEnvironment): 
DataStream[BaseRow] = {
-    throw new UnsupportedOperationException("This TableSource is only used for 
unit test, " +
-      "this method should never be called.")
-  }
 }
 
 class TestInvalidTemporalTable private(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
index e32fce4..eab3a22 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
@@ -25,10 +25,9 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableSchema, Types}
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
-import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.table.util.TestTableSourceWithTime
 import org.apache.flink.types.Row
-
 import org.junit.Test
 
 import java.lang.{Integer => JInt, Long => JLong}
@@ -42,11 +41,13 @@ class TableScanITCase extends BatchTestBase {
   def testTableSourceWithoutTimeAttribute(): Unit = {
     val tableName = "MyTable"
 
-    val tableSource = new BatchTableSource[Row]() {
+    val tableSource = new StreamTableSource[Row]() {
       private val fieldNames: Array[String] = Array("name", "id", "value")
       private val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, 
Types.LONG, Types.INT)
 
-      override def getBoundedStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
+      override def isBounded: Boolean = true
+
+      override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
         val data = Seq(
           row("Mary", new JLong(1L), new JInt(1)),
           row("Bob", new JLong(2L), new JInt(3))
@@ -77,7 +78,7 @@ class TableScanITCase extends BatchTestBase {
     val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, 
Types.SQL_TIMESTAMP))
     val returnType = Types.STRING
 
-    val tableSource = new TestTableSourceWithTime(schema, returnType, data, 
null, "ptime")
+    val tableSource = new TestTableSourceWithTime(true, schema, returnType, 
data, null, "ptime")
     tEnv.registerTableSource(tableName, tableSource)
 
     checkResult(
@@ -105,7 +106,7 @@ class TableScanITCase extends BatchTestBase {
       Array(Types.STRING, Types.SQL_TIMESTAMP, 
Types.INT).asInstanceOf[Array[TypeInformation[_]]],
       fieldNames)
 
-    val tableSource = new TestTableSourceWithTime(schema, rowType, data, 
"rtime", null)
+    val tableSource = new TestTableSourceWithTime(true, schema, rowType, data, 
"rtime", null)
     tEnv.registerTableSource(tableName, tableSource)
 
     checkResult(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala
index c399b1b..13a2ae7 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala
@@ -18,25 +18,24 @@
 
 package org.apache.flink.table.runtime.batch.sql.agg
 
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, 
LONG_TYPE_INFO, STRING_TYPE_INFO}
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.core.io.InputSplit
 
 import scala.collection.JavaConversions._
 import org.apache.flink.table.api.{TableSchema, _}
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.TestData._
-import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.sources.InputFormatTableSource
 import org.apache.flink.table.util.DateTimeTestUtil.UTCTimestamp
 import org.apache.flink.table.util.{CountAggFunction, IntAvgAggFunction, 
IntSumAggFunction}
 import org.apache.flink.types.Row
-
 import org.junit.{Before, Ignore, Test}
 
 class WindowAggregateITCase extends BatchTestBase {
@@ -398,17 +397,14 @@ class WindowAggregateITCase extends BatchTestBase {
     //      "a" -> new ColumnStats(10000000L, 1L, 8D, 8, 5, -5),
     //      "b" -> new ColumnStats(8000000L, 0L, 4D, 32, 6.1D, 0D),
     //      "c" -> new ColumnStats(9000000L, 0L, 1024D, 32, 6.1D, 0D))
-    val table = new BatchTableSource[Row] {
-      override def getReturnType: TypeInformation[Row] =
-        new RowTypeInfo(tableSchema.getFieldTypes, tableSchema.getFieldNames)
+    val table = new InputFormatTableSource[Row] {
+      override def getReturnType: TypeInformation[Row] = type3WithTimestamp
 
       //      override def getTableStats: TableStats = new 
TableStats(10000000L, colStats)
 
-      override def getBoundedStream(streamEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
-        streamEnv.createInput(
-          new CollectionInputFormat[Row](data3WithTimestamp,
-            type3WithTimestamp.createSerializer(env.getConfig)),
-          type3WithTimestamp)
+      override def getInputFormat: InputFormat[Row, _ <: InputSplit] = {
+        new CollectionInputFormat[Row](data3WithTimestamp,
+          type3WithTimestamp.createSerializer(env.getConfig))
       }
 
       override def getTableSchema: TableSchema = tableSchema
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
index 67a2f59..9970ffe 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
@@ -80,7 +80,7 @@ class TableScanITCase extends StreamingTestBase {
     val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, 
Types.SQL_TIMESTAMP))
     val returnType = Types.STRING
 
-    val tableSource = new TestTableSourceWithTime(schema, returnType, data, 
null, "ptime")
+    val tableSource = new TestTableSourceWithTime(false, schema, returnType, 
data, null, "ptime")
     tEnv.registerTableSource(tableName, tableSource)
 
     val sqlQuery = s"SELECT name FROM $tableName"
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
index 21cf7f8..1abcf2e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
@@ -54,7 +54,6 @@ class InMemoryLookupableTableSource(
     lookupConfig: LookupConfig)
   extends LookupableTableSource[Row]
   with StreamTableSource[Row]
-  with BatchTableSource[Row]
   with DefinedPrimaryKey
   with DefinedIndexes {
 
@@ -124,10 +123,6 @@ class InMemoryLookupableTableSource(
   override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
     throw new UnsupportedOperationException("This should never be called.")
   }
-
-  override def getBoundedStream(streamEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
-    throw new UnsupportedOperationException("This should never be called.")
-  }
 }
 
 object InMemoryLookupableTableSource {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 25fdc74..0d92a4d 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -32,28 +32,28 @@ import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableFunction}
 import org.apache.flink.table.plan.nodes.exec.ExecNode
 import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, 
FlinkStreamProgram}
-import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable}
+import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil}
 import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, 
TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
-import org.apache.flink.table.sinks.{AppendStreamTableSink, 
CollectRowTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
-import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource}
+import org.apache.flink.table.sinks._
+import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.table.types.TypeInfoLogicalTypeConverter
 import 
org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
 import org.apache.flink.table.types.logical.LogicalType
-import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.BaseRowTypeInfo
 import org.apache.flink.types.Row
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql.SqlExplainLevel
+
 import org.apache.commons.lang3.SystemUtils
+
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Rule
 import org.junit.rules.{ExpectedException, TestName}
 
-import _root_.java.util.{Set => JSet}
-
 import _root_.scala.collection.JavaConversions._
 
 /**
@@ -132,13 +132,10 @@ abstract class TableTestUtil(test: TableTestBase) {
       case at: AtomicType[_] => Array[TypeInformation[_]](at)
       case _ => throw new TableException(s"Unsupported type info: $typeInfo")
     }
+    val dataType = TypeConversions.fromLegacyInfoToDataType(typeInfo)
     val tableEnv = getTableEnv
-    val (fieldNames, _) = tableEnv.getFieldInfo(
-      fromLegacyInfoToDataType(typeInfo), fields.map(_.name).toArray)
-    val schema = new TableSchema(fieldNames, fieldTypes)
-    val tableSource = new TestTableSource(schema)
-    tableEnv.registerTableSource(name, tableSource)
-    tableEnv.scan(name)
+    val (fieldNames, _) = tableEnv.getFieldInfo(dataType, 
fields.map(_.name).toArray)
+    addTableSource(name, fieldTypes, fieldNames)
   }
 
   /**
@@ -147,14 +144,14 @@ abstract class TableTestUtil(test: TableTestBase) {
     *
     * @param name table name
     * @param types field types
-    * @param names field names
+    * @param fields field names
     * @param statistic statistic of current table
     * @return returns the registered [[Table]].
     */
   def addTableSource(
       name: String,
       types: Array[TypeInformation[_]],
-      names: Array[String],
+      fields: Array[String],
       statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table
 
   /**
@@ -499,8 +496,8 @@ case class StreamTableTestUtil(test: TableTestBase) extends 
TableTestUtil(test)
       statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table = {
     val tableEnv = getTableEnv
     val schema = new TableSchema(names, types)
-    val tableSource = new TestTableSource(schema)
-    val table = new StreamTableSourceTable[BaseRow](tableSource, statistic)
+    val tableSource = new TestTableSource(true, schema)
+    val table = new TableSourceTable[BaseRow](tableSource, true, statistic)
     tableEnv.registerTableInternal(name, table)
     tableEnv.scan(name)
   }
@@ -612,8 +609,8 @@ case class BatchTableTestUtil(test: TableTestBase) extends 
TableTestUtil(test) {
       statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table = {
     val tableEnv = getTableEnv
     val schema = new TableSchema(names, types)
-    val tableSource = new TestTableSource(schema)
-    val table = new BatchTableSourceTable[BaseRow](tableSource, statistic)
+    val tableSource = new TestTableSource(true, schema)
+    val table = new TableSourceTable[BaseRow](tableSource, false, statistic)
     tableEnv.registerTableInternal(name, table)
     tableEnv.scan(name)
   }
@@ -647,14 +644,10 @@ case class BatchTableTestUtil(test: TableTestBase) 
extends TableTestUtil(test) {
 /**
   * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for testing.
   */
-class TestTableSource(schema: TableSchema)
-  extends BatchTableSource[BaseRow]
-  with StreamTableSource[BaseRow] {
+class TestTableSource(isBatch: Boolean, schema: TableSchema)
+  extends StreamTableSource[BaseRow] {
 
-  override def getBoundedStream(
-      streamEnv: environment.StreamExecutionEnvironment): DataStream[BaseRow] 
= {
-    streamEnv.fromCollection(List[BaseRow](), getReturnType)
-  }
+  override def isBounded: Boolean = isBatch
 
   override def getDataStream(
       execEnv: environment.StreamExecutionEnvironment): DataStream[BaseRow] = {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index a6f744a..6852e93 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -33,6 +33,7 @@ import java.util.Collections
 import scala.collection.JavaConversions._
 
 class TestTableSourceWithTime[T](
+    isBatch: Boolean,
     tableSchema: TableSchema,
     returnType: TypeInformation[T],
     values: Seq[T],
@@ -40,17 +41,14 @@ class TestTableSourceWithTime[T](
     proctime: String = null,
     mapping: Map[String, String] = null)
   extends StreamTableSource[T]
-  with BatchTableSource[T]
   with DefinedRowtimeAttributes
   with DefinedProctimeAttribute
   with DefinedFieldMapping {
 
-  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[T] = {
-    execEnv.fromCollection(values, returnType)
-  }
+  override def isBounded: Boolean = isBatch
 
-  override def getBoundedStream(streamEnv: StreamExecutionEnvironment): 
DataStreamSource[T] = {
-    val dataStream = streamEnv.fromCollection(values, returnType)
+  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[T] = {
+    val dataStream = execEnv.fromCollection(values, returnType)
     dataStream.getTransformation.setMaxParallelism(1)
     dataStream
   }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
index b178d7a..2e6401d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
@@ -40,9 +40,10 @@ import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.table.sinks._
-import org.apache.flink.table.sources.{BatchTableSource, TableSource, 
TableSourceUtil}
+import org.apache.flink.table.sources.{BatchTableSource, 
InputFormatTableSource, TableSource, TableSourceUtil}
 import 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 import org.apache.flink.table.typeutils.FieldInfoUtils.{calculateTableSchema, 
getFieldsInfo, validateInputTypeInfo}
+import org.apache.flink.table.utils.TableConnectorUtils
 import org.apache.flink.types.Row
 
 /**
@@ -68,15 +69,18 @@ abstract class BatchTableEnvImpl(
   override protected def validateTableSource(tableSource: TableSource[_]): 
Unit = {
     TableSourceUtil.validateTableSource(tableSource)
 
-    if (!tableSource.isInstanceOf[BatchTableSource[_]]) {
-      throw new TableException("Only BatchTableSource can be registered in " +
-        "BatchTableEnvironment.")
+    if (!tableSource.isInstanceOf[BatchTableSource[_]] &&
+        !tableSource.isInstanceOf[InputFormatTableSource[_]]) {
+      throw new TableException("Only BatchTableSource and 
InputFormatTableSource can be registered " +
+        "in BatchTableEnvironment.")
     }
   }
 
   override protected  def validateTableSink(configuredSink: TableSink[_]): 
Unit = {
-    if (!configuredSink.isInstanceOf[BatchTableSink[_]]) {
-      throw new TableException("Only BatchTableSink can be registered in 
BatchTableEnvironment.")
+    if (!configuredSink.isInstanceOf[BatchTableSink[_]] &&
+        !configuredSink.isInstanceOf[BoundedTableSink[_]]) {
+      throw new TableException("Only BatchTableSink and BoundedTableSink can 
be registered " +
+        "in BatchTableEnvironment.")
     }
   }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
index 69b8164..9078560 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
@@ -84,7 +84,7 @@ abstract class StreamTableEnvImpl(
     tableSource match {
 
       // check for proper stream table source
-      case streamTableSource: StreamTableSource[_] =>
+      case streamTableSource: StreamTableSource[_] if 
!streamTableSource.isBounded =>
         // check that event-time is enabled if table source includes rowtime 
attributes
         if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) &&
           execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) 
{
@@ -93,6 +93,10 @@ abstract class StreamTableEnvImpl(
                 s"environment. But is: ${execEnv.getStreamTimeCharacteristic}")
         }
 
+      case streamTableSource: StreamTableSource[_] if 
streamTableSource.isBounded =>
+        throw new TableException("Only unbounded StreamTableSource (isBounded 
returns false) " +
+          "can be registered in StreamTableEnvironment")
+
       // not a stream table source
       case _ =>
         throw new TableException("Only StreamTableSource can be registered in 
" +
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 7f4a5fb..19e0bda 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -23,13 +23,16 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.io.InputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.core.io.InputSplit
 import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvImpl, 
TableException, Types}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.sources._
-import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import 
org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, 
fromLegacyInfoToDataType}
 import org.apache.flink.types.Row
 
 /** Flink RelNode to read data from an external source defined by a 
[[BatchTableSource]]. */
@@ -37,7 +40,7 @@ class BatchTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    tableSource: BatchTableSource[_],
+    tableSource: TableSource[_],
     selectedFields: Option[Array[Int]])
   extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource, 
selectedFields)
   with BatchScan {
@@ -89,7 +92,21 @@ class BatchTableSourceScan(
       selectedFields)
 
     val config = tableEnv.getConfig
-    val inputDataSet = 
tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
+    val inputDataSet = tableSource match {
+      case batchSource: BatchTableSource[_] =>
+        batchSource.getDataSet(tableEnv.execEnv)
+      case boundedSource: InputFormatTableSource[_] =>
+        val resultType = 
fromDataTypeToLegacyInfo(boundedSource.getProducedDataType)
+            .asInstanceOf[TypeInformation[Any]]
+        val inputFormat = boundedSource.getInputFormat
+          .asInstanceOf[InputFormat[Any, _ <: InputSplit]]
+        tableEnv.execEnv
+          .createInput(inputFormat, resultType)
+          .name(boundedSource.explainSource)
+          .asInstanceOf[DataSet[_]]
+      case _ => throw new TableException("Only BatchTableSource and 
InputFormatTableSource are " +
+        "supported in BatchTableEnvironment.")
+    }
     val outputSchema = new RowSchema(this.getRowType)
 
     val inputDataType = fromLegacyInfoToDataType(inputDataSet.getType)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
index 8ce97ed..c51c99a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
@@ -50,7 +50,7 @@ class BatchTableSourceScanRule
       rel.getCluster,
       traitSet,
       scan.getTable,
-      scan.tableSource.asInstanceOf[BatchTableSource[_]],
+      scan.tableSource,
       scan.selectedFields
     )
   }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
index fa405d6..e0dc5c3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
@@ -20,17 +20,16 @@ package org.apache.flink.table.api.validation
 
 import java.util
 import java.util.Collections
-
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala.StreamTableEnvironment
-import org.apache.flink.table.api.{TableSchema, Types, ValidationException}
+import org.apache.flink.table.api.{TableException, TableSchema, Types, 
ValidationException}
 import org.apache.flink.table.sources._
 import org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
-import org.apache.flink.table.utils.TestTableSourceWithTime
+import org.apache.flink.table.utils.{TestInputFormatTableSource, 
TestTableSourceWithTime}
 import org.apache.flink.types.Row
 import org.junit.Test
 
@@ -245,4 +244,22 @@ class TableSourceValidationTest {
       // should fail, field can be empty
       .build()
   }
+
+  @Test(expected = classOf[TableException])
+  def testBoundedTableSourceForStreamEnv(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val fieldNames = Array("id", "name")
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      fieldNames)
+    val schema = new TableSchema(
+      fieldNames,
+      Array(Types.LONG, Types.STRING()))
+    val ts = new TestInputFormatTableSource(schema, rowType, Seq[Row]())
+
+    // should fail because InputFormatTableSource is not supported in 
StreamTableEnvironment
+    tEnv.registerTableSource("testTable", ts)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
index b152dcb..eb9b31d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
@@ -73,6 +73,39 @@ class TableSourceITCase(
   }
 
   @Test
+  def testBoundedTableSource(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = BatchTableEnvironment.create(env)
+
+    val data = Seq(
+      Row.of("Mary", new JLong(1L), new JInt(10)),
+      Row.of("Bob", new JLong(2L), new JInt(20)),
+      Row.of("Mary", new JLong(2L), new JInt(30)),
+      Row.of("Liz", new JLong(2001L), new JInt(40)))
+
+    val fieldNames = Array("name", "rtime", "amount")
+    val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.LONG(), 
Types.INT))
+    val rowType = new RowTypeInfo(
+      Array(Types.STRING, Types.LONG, 
Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      fieldNames)
+
+    val tableSource = new TestInputFormatTableSource(schema, rowType, data)
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val results = tEnv.scan(tableName)
+      .groupBy('name)
+      .select('name, 'amount.sum)
+      .collect()
+
+    val expected = Seq(
+      "Mary,40",
+      "Bob,20",
+      "Liz,40").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testCsvTableSourceWithProjection(): Unit = {
     val csvTable = CommonTestData.getCsvTableSource
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
index bbc155b..6f66158 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.table.utils
 
-import java.util
-import java.util.{Collections, List => JList}
-
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.core.io.InputSplit
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableSchema
@@ -33,6 +34,9 @@ import 
org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
PreserveWatermarks}
 import org.apache.flink.types.Row
 
+import java.util
+import java.util.Collections
+
 import scala.collection.JavaConverters._
 
 class TestTableSourceWithTime[T](
@@ -225,3 +229,17 @@ class TestPreserveWMTableSource[T](
   override def getTableSchema: TableSchema = tableSchema
 
 }
+
+class TestInputFormatTableSource[T](
+    tableSchema: TableSchema,
+    returnType: TypeInformation[T],
+    values: Seq[T]) extends InputFormatTableSource[T] {
+
+  override def getInputFormat: InputFormat[T, _ <: InputSplit] = {
+    new CollectionInputFormat[T](values.asJava, 
returnType.createSerializer(new ExecutionConfig))
+  }
+
+  override def getReturnType: TypeInformation[T] = returnType
+
+  override def getTableSchema: TableSchema = tableSchema
+}

Reply via email to