[ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710993#comment-16710993
 ] 

ASF GitHub Bot commented on FLINK-9555:
---------------------------------------

asfgit closed pull request #7121: [FLINK-9555]Support table api in scala shell
URL: https://github.com/apache/flink/pull/7121
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
index 3c9a563b30f..e7d31355b06 100644
--- a/flink-scala-shell/pom.xml
+++ b/flink-scala-shell/pom.xml
@@ -78,6 +78,13 @@ under the License.
                        <version>${scala.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
                <!-- test dependencies -->
 
                <dependency>
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index 4b6e886994a..c124d8ea8a3 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -23,6 +23,8 @@ import java.io.{BufferedReader, File, FileOutputStream}
 import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment, 
ScalaShellRemoteStreamEnvironment}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, 
StreamTableEnvironment}
 import org.apache.flink.util.AbstractID
 
 import scala.tools.nsc.interpreter._
@@ -90,10 +92,17 @@ class FlinkILoop(
   }
 
   // local environment
-  val (scalaBenv: ExecutionEnvironment, scalaSenv: StreamExecutionEnvironment) 
= {
+  val (
+    scalaBenv: ExecutionEnvironment,
+    scalaSenv: StreamExecutionEnvironment,
+    scalaBTEnv: BatchTableEnvironment,
+    scalaSTEnv: StreamTableEnvironment
+    ) = {
     val scalaBenv = new ExecutionEnvironment(remoteBenv)
     val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
-    (scalaBenv,scalaSenv)
+    val scalaBTEnv = TableEnvironment.getTableEnvironment(scalaBenv)
+    val scalaSTEnv = TableEnvironment.getTableEnvironment(scalaSenv)
+    (scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv)
   }
 
   /**
@@ -139,7 +148,10 @@ class FlinkILoop(
     "org.apache.flink.api.scala._",
     "org.apache.flink.api.scala.utils._",
     "org.apache.flink.streaming.api.scala._",
-    "org.apache.flink.streaming.api.windowing.time._"
+    "org.apache.flink.streaming.api.windowing.time._",
+    "org.apache.flink.table.api._",
+    "org.apache.flink.table.api.scala._",
+    "org.apache.flink.types.Row"
   )
 
   override def createInterpreter(): Unit = {
@@ -152,6 +164,8 @@ class FlinkILoop(
       // set execution environment
       intp.bind("benv", this.scalaBenv)
       intp.bind("senv", this.scalaSenv)
+      intp.bind("btenv", this.scalaBTEnv)
+      intp.bind("stenv", this.scalaSTEnv)
     }
   }
 
@@ -243,22 +257,29 @@ class FlinkILoop(
 
               F L I N K - S C A L A - S H E L L
 
-NOTE: Use the prebound Execution Environments to implement batch or streaming 
programs.
+NOTE: Use the prebound Execution Environments and Table Environment to 
implement batch or streaming programs.
 
-  Batch - Use the 'benv' variable
+  Batch - Use the 'benv' and 'btenv' variable
 
     * val dataSet = benv.readTextFile("/path/to/data")
     * dataSet.writeAsText("/path/to/output")
     * benv.execute("My batch program")
+    *
+    * val batchTable = btenv.fromDataSet(dataSet)
+    * btenv.registerTable("tableName", batchTable)
+    * val result = btenv.sqlQuery("SELECT * FROM tableName").collect
+    HINT: You can use print() on a DataSet to print the contents or collect()
+    a sql query result back to the shell.
 
-    HINT: You can use print() on a DataSet to print the contents to the shell.
-
-  Streaming - Use the 'senv' variable
+  Streaming - Use the 'senv' and 'stenv' variable
 
     * val dataStream = senv.fromElements(1, 2, 3, 4)
     * dataStream.countWindowAll(2).sum(0).print()
+    *
+    * val streamTable = stenv.fromDataStream(dataStream, 'num)
+    * val resultTable = streamTable.select('num).where('num % 2 === 1 )
+    * resultTable.toAppendStream[Row].print()
     * senv.execute("My streaming program")
-
     HINT: You can only print a DataStream to the shell in local mode.
       """
     // scalastyle:on
diff --git 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 337e4fb9be9..fc90d8d143c 100644
--- 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -168,6 +168,61 @@ class ScalaShellITCase extends TestLogger {
     Assert.assertTrue(output.contains("WC(world,10)"))
   }
 
+  @Test
+  def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+    val input =
+      """
+        |val data = Seq(
+        |    (1, 1L, "Hi"),
+        |    (2, 2L, "Hello"),
+        |    (3, 2L, "Hello world"))
+        |val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 
'c).select('a,'c).where(
+        |'a% 2 === 1 )
+        |val results = t.toDataSet[Row].collect()
+        |results.foreach(println)
+        |:q
+      """.stripMargin
+    val output = processInShell(input)
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+    Assert.assertTrue(output.contains("1,Hi"))
+    Assert.assertTrue(output.contains("3,Hello world"))
+  }
+
+  @Test
+  def testGroupedAggregationStreamTableAPIQuery: Unit = {
+    val input =
+      """
+        |  val data = List(
+        |    ("Hello", 1),
+        |    ("word", 1),
+        |    ("Hello", 1),
+        |    ("bark", 1),
+        |    ("bark", 1),
+        |    ("bark", 1),
+        |    ("bark", 1),
+        |    ("bark", 1),
+        |    ("bark", 1),
+        |    ("flink", 1)
+        |  )
+        | val stream = senv.fromCollection(data)
+        | val table = stream.toTable(stenv, 'word, 'num)
+        | val resultTable = table.groupBy('word).select('num.sum as 
'count).groupBy('count).select(
+        | 'count,'count.count as 'frequency)
+        | val results = resultTable.toRetractStream[Row]
+        | results.print
+        | senv.execute
+      """.stripMargin
+    val output = processInShell(input)
+    Assert.assertTrue(output.contains("6,1"))
+    Assert.assertTrue(output.contains("1,2"))
+    Assert.assertTrue(output.contains("2,1"))
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+  }
+
   /**
    * Submit external library.
    * Disabled due to FLINK-7111.
diff --git a/flink-scala-shell/start-script/start-scala-shell.sh 
b/flink-scala-shell/start-script/start-scala-shell.sh
index 033d5050322..e3571145f13 100644
--- a/flink-scala-shell/start-script/start-scala-shell.sh
+++ b/flink-scala-shell/start-script/start-scala-shell.sh
@@ -52,6 +52,13 @@ bin=`cd "$bin"; pwd`
 
 FLINK_CLASSPATH=`constructFlinkClassPath`
 
+# Append flink-table jar into class path
+opt=`dirname "$0"`
+opt=`cd ../"$opt"/opt; pwd`
+FLINK_TABLE_LIB_PATH=$opt/`ls $opt|grep flink-table_*`
+FLINK_CLASSPATH=$FLINK_CLASSPATH:$FLINK_TABLE_LIB_PATH
+
+
 # https://issues.scala-lang.org/browse/SI-6502, cant load external jars 
interactively
 # in scala shell since 2.10, has to be done at startup
 # checks arguments for additional classpath and adds it to the "standard 
classpath"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Support table api in scala shell
> --------------------------------
>
>                 Key: FLINK-9555
>                 URL: https://issues.apache.org/jira/browse/FLINK-9555
>             Project: Flink
>          Issue Type: New Feature
>          Components: Scala Shell
>    Affects Versions: 1.5.0
>            Reporter: Jeff Zhang
>            Assignee: shuiqiangchen
>            Priority: Major
>              Labels: pull-request-available
>
> It would be nice to have table api available in scala shell so that user can 
> experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to