leonardBang commented on a change in pull request #17749:
URL: https://github.com/apache/flink/pull/17749#discussion_r754140539



##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
##########
@@ -50,84 +52,151 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
 
   protected var resultPath: String = _
 
-  private val data = Seq(
-    Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "7"),
-    Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "8"),
-    Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "9"),
+  // iso date
+  private val data: Seq[Row] = Seq(
+    Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "07"),
+    Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "08"),
+    Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "09"),
     Row.of(Integer.valueOf(4), "x", "y", "2020-05-03", "10"),
     Row.of(Integer.valueOf(5), "x", "y", "2020-05-03", "11"))
 
+  // basic iso date
+  private val data2 = Seq(
+    Row.of(Integer.valueOf(1), "a", "b", "20200503", "07"),
+    Row.of(Integer.valueOf(2), "p", "q", "20200503", "08"),
+    Row.of(Integer.valueOf(3), "x", "y", "20200503", "09"),
+    Row.of(Integer.valueOf(4), "x", "y", "20200504", "10"),
+    Row.of(Integer.valueOf(5), "x", "y", "20200504", "11"))
+
   @Before
   override def before(): Unit = {
     super.before()
-    resultPath = tempFolder.newFolder().toURI.toString
 
     env.setParallelism(1)
     env.enableCheckpointing(100)
-
-    val stream = new DataStream(env.getJavaEnv.addSource(
-      new FiniteTestSource(data),
-      new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
-
-    tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"), 
$("d"), $("e"))
+    
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   }
 
   def additionalProperties(): Array[String] = Array()
 
   @Test
   def testNonPart(): Unit = {
-    test(partition = false)
+    testIsoDate(partition = false)
   }
 
   @Test
   def testPart(): Unit = {
-    test(partition = true)
+    testIsoDate(partition = true)
     val basePath = new File(new URI(resultPath).getPath, "d=2020-05-03")
     Assert.assertEquals(5, basePath.list().length)
-    Assert.assertTrue(new File(new File(basePath, "e=7"), 
"_MY_SUCCESS").exists())
-    Assert.assertTrue(new File(new File(basePath, "e=8"), 
"_MY_SUCCESS").exists())
-    Assert.assertTrue(new File(new File(basePath, "e=9"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=07"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=08"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=09"), 
"_MY_SUCCESS").exists())
     Assert.assertTrue(new File(new File(basePath, "e=10"), 
"_MY_SUCCESS").exists())
     Assert.assertTrue(new File(new File(basePath, "e=11"), 
"_MY_SUCCESS").exists())
   }
 
-  private def test(partition: Boolean, policy: String = "success-file"): Unit 
= {
-    val dollar = '$'
-    val ddl = s"""
-                 |create table sink_table (
-                 |  a int,
-                 |  b string,
-                 |  c string,
-                 |  d string,
-                 |  e string
-                 |)
-                 |${if (partition) "partitioned by (d, e)" else ""}
-                 |with (
-                 |  'connector' = 'filesystem',
-                 |  'path' = '$resultPath',
-                 |  '${PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()}' =
-                 |      '${dollar}d ${dollar}e:00:00',
-                 |  '${SINK_PARTITION_COMMIT_DELAY.key()}' = '1h',
-                 |  '${SINK_PARTITION_COMMIT_POLICY_KIND.key()}' = '$policy',
-                 |  '${SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key()}' = 
'_MY_SUCCESS',
-                 |  ${additionalProperties().mkString(",\n")}
-                 |)
-       """.stripMargin
-    tEnv.executeSql(ddl)
-
-    tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await()
-
-    check("select * from sink_table", data)
-  }
-
   @Test
   def testMetastorePolicy(): Unit = {
     thrown.expectMessage(
       "Can not configure a 'metastore' partition commit policy for a file 
system table." +
-          " You can only configure 'metastore' partition commit policy for a 
hive table.")
-    test(partition = true, "metastore")
+        " You can only configure 'metastore' partition commit policy for a 
hive table.")
+    testIsoDate(partition = true, "metastore")
+  }
+  
+  @Test
+  def testBasicDate(): Unit = {
+
+    // create source test data stream
+    val fun = (t: Row) => {
+      val localDateTime = LocalDateTime.of(
+        LocalDate.parse(t.getFieldAs[String](3), 
DateTimeFormatter.BASIC_ISO_DATE),
+        LocalTime.MIDNIGHT)
+      TimestampData.fromLocalDateTime(localDateTime).getMillisecond
+    }
+
+    val stream: DataStream[Row] = new DataStream(env.getJavaEnv.addSource(
+      new FiniteTestSource(data2, fun),
+      new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
+
+    // write out the data
+    test(stream, "default", "yyyyMMdd", "$d", "d", "partition-time", "1d", 
data2)
+
+    // verify that the written data is correct
+    val basePath = new File(new URI(resultPath).getPath)
+    Assert.assertEquals(2, basePath.list().length)
+    Assert.assertTrue(new File(new File(basePath, "d=20200503"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "d=20200504"), 
"_MY_SUCCESS").exists())
+  }
+
+  def testIsoDate(partition: Boolean, policy: String = "success-file"): Unit = 
{
+
+    val fun = (t: Row) => {
+      val localDateTime = LocalDateTime.parse(s"${t.getField(3)} 
${t.getField(4)}:00:00",
+        DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))

Review comment:
       Give a non-basic format  String to cover what you change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to