KurtYoung commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r301548092
 
 

 ##########
 File path: flink-python/pyflink/table/table_environment.py
 ##########
 @@ -235,6 +235,69 @@ def explain(self, table):
         """
         return self._j_tenv.explain(table._j_table)
 
+    def sql(self, query):
+        """
+        Evaluates single sql statement including DDLs and DMLs.
+
+        Note: Always use this interface to execute a sql query. It only 
supports
+        to execute one sql statement a time.
+
+        A DDL statement can execute to create/drop a table/view:
+        For example, the below DDL statement would create a CSV table named 
`tbl1`
+        into the current catalog:
+
+        create table tbl1(
+            a int,
+            b bigint,
+            c varchar
+        ) with (
+            connector = 'csv',
+            csv.path = 'xxx'
+        )
+
+        The returns table format for different kind of statement:
+        DDL: a table with one column of VARCHAR type to describe if this 
operation
+        is success.
+        DML: a sql insert returns a table with one column of VARCHAR type to 
describe the
+        affected rows; a sql query(select) returns a table to describe the 
query data set,
+        it can be further queried through the Table API, or directly write to 
sink with
+        `~Table.insert_into`.
+
+        SQL queries can directly execute as follows:
+        ::
+        >>> sinkDDL =
+            "create table sinkTable(
+                a int,
+                b varchar
+            ) with (
+                connector = 'csv',
+                csv.path = 'xxx'
+            )"
+
+        >>> sourceDDL =
+            "create table sourceTable(
+                a int,
+                b varchar
+            ) with (
+                connector = 'kafka',
+                kafka.topic = 'xxx',
+                kafka.endpoint = 'x.x.x'
+            )"
+
+        query = "INSERT INTO sinkTable SELECT FROM sourceTable"
+
+        tEnv.sql(sourceDDL)
+        tEnv.sql()sinkDDL)
+        tEnv.sql(query)
+        tEnv.execute("MyJob")
+
+        This code snippet creates a job to read data from Kafka source into a 
CSV sink.
+
+        :param query: The SQL statement to evaluate.
+        """
+        j_table = self._j_tenv.sql(query)
+        return Table(j_table)
 
 Review comment:
   Will this cause any problem if `self._j_tenv.sql(query)` returns NULL?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to