leonardBang commented on a change in pull request #17749:
URL: https://github.com/apache/flink/pull/17749#discussion_r754021984



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
##########
@@ -297,7 +300,22 @@ public HiveContinuousPartitionFetcherContext(
                     if 
(configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET)) {
                         String consumeOffsetStr =
                                 
configuration.getString(STREAMING_SOURCE_CONSUME_START_OFFSET);
-                        consumeStartOffset = (T) 
Long.valueOf(toMills(consumeOffsetStr));
+
+                        LocalDateTime localDateTime =
+                                
configuration.getString(PARTITION_TIME_EXTRACTOR_FORMATTER_PATTEN)
+                                                == null
+                                        ? 
DefaultPartTimeExtractor.toLocalDateTimeDefault(
+                                                consumeOffsetStr)
+                                        : 
DefaultPartTimeExtractor.toLocalDateTime(
+                                                consumeOffsetStr,
+                                                configuration.getString(
+                                                        
PARTITION_TIME_EXTRACTOR_FORMATTER_PATTEN));
+

Review comment:
       `DefaultPartTimeExtractor.toLocalDateTimeDefault()` and 
`DefaultPartTimeExtractor.toLocalDateTime` can unify to one method  
`DefaultPartTimeExtractor.toLocalDateTime(String timestampString, @Nullable 
String formatterPattern)`

##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -328,15 +328,30 @@ Time extractors define extracting time from partition 
values.
         <td>String</td>
         <td>The extractor class for implement PartitionTimeExtractor 
interface.</td>
     </tr>
+    <tr>
+        <td><h5>partition.time-extractor.formatter-pattern</h5></td>
+        <td style="word-wrap: break-word;">yyyy-MM-dd&nbsp;HH:mm:ss</td>
+        <td>String</td>
+        <td>When 'partition.time-extractor.kind' is set to 'default', you can 
specify a formatter pattern to get a timestamp from partitions. <br>By default, 
the format is 'yyyy-MM-dd&nbsp;HH:mm:ss'.
+            <br>Examples as follows:
+            <br>'yyyy-MM-dd' -> '2018-07-14'
+            <br>'dd-MMM-yyyy' -> '14-Jul-2018'
+            <br>'E, MMM dd yyyy' -> 'Sat, Jul 14 2018'
+            <br>More details: https://www.w3.org/QA/Tips/iso-date</td>
+    </tr>
     <tr>
         <td><h5>partition.time-extractor.timestamp-pattern</h5></td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
-        <td>The 'default' construction way allows users to use partition 
fields to get a legal timestamp pattern. Default support 'yyyy-mm-dd hh:mm:ss' 
from first field. If timestamp should be extracted from a single partition 
field 'dt', can configure: '$dt'. If timestamp should be extracted from 
multiple partition fields, say 'year', 'month', 'day' and 'hour', can 
configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted 
from two partition fields 'dt' and 'hour', can configure: '$dt 
$hour:00:00'.</td>
+        <td>The 'default' construction way allows users to use partition 
fields to get a legal timestamp pattern. Default support 'yyyy-MM-dd HH:mm:ss' 
from first field. If timestamp should be extracted from a single partition 
field 'dt', can configure: '$dt'. If timestamp should be extracted from 
multiple partition fields, say 'year', 'month', 'day' and 'hour', can 
configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted 
from two partition fields 'dt' and 'hour', can configure: '$dt 
$hour:00:00'.</td>
     </tr>
   </tbody>
 </table>
 
+
+
+
+

Review comment:
       delete these

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
##########
@@ -167,7 +169,11 @@ public void open() throws Exception {
                                 Short.MAX_VALUE);
                 for (String partitionName : partitionNames) {
                     List<String> partValues = 
extractPartitionValues(partitionName);
-                    Long partitionTime = 
toMills(extractor.extract(partitionKeys, partValues));
+
+                    Long partitionTime =
+                            TimestampData.fromLocalDateTime(
+                                            extractor.extract(partitionKeys, 
partValues))
+                                    .getMillisecond();

Review comment:
       I think original is more brief ?

##########
File path: docs/content.zh/docs/connectors/table/filesystem.md
##########
@@ -276,18 +276,31 @@ file sink 支持文件合并,以允许应用程序可以使用较小的检查
         <td>String</td>
         <td>实现了接口 PartitionTimeExtractor 的提取器类.</td>
     </tr>
+    <tr>
+        <td><h5>partition.time-extractor.formatter-pattern</h5></td>
+        <td style="word-wrap: break-word;">yyyy-MM-dd&nbsp;HH:mm:ss</td>
+        <td>String</td>
+        
<td>指定时间格式化的类型。当'partition.time-extractor.kind'设置为'default'时,你可以指定分区字段的时间格式化类型。 
<br>它的默认格式化类型是:'yyyy-MM-dd HH:mm:ss'.
+            <br>示例如下:
+            <br>'yyyy-MM-dd' -> '2018-07-14'
+            <br>'dd-MMM-yyyy' -> '14-Jul-2018'
+            <br>'E, MMM dd yyyy' -> 'Sat, Jul 14 2018'
+            <br>更多细节可参考: https://www.w3.org/QA/Tips/iso-date</td>
+    </tr>
     <tr>
         <td><h5>partition.time-extractor.timestamp-pattern</h5></td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
-        <td> 'default' 时间提取器允许用户从分区字段中提取合法的时间戳模式。默认支持从第一个字段按 'yyyy-mm-dd 
hh:mm:ss' 时间戳模式提取。
+        <td> 'default' 时间提取器允许用户从分区字段中提取合法的时间戳模式。默认支持从第一个字段按 'yyyy-MM-dd 
HH:mm:ss' 时间戳模式提取。
         如果需要从一个分区字段比如 ‘dt’ 提取时间戳,可以配置为: '$dt';
         如果需要从多个分区字段,比如 'year', 'month', 'day' 和 
'hour'提取时间戳,可以配置为:'$year-$month-$day $hour:00:00';
         如果需要从两字分区字段,比如 'dt' 和 'hour' 提取时间戳,可以配置为:'$dt $hour:00:00'.</td>
     </tr>
   </tbody>
 </table>
 
+
+
 默认的提取器是基于由分区字段组合而成的时间戳模式。你也可以指定一个实现了 `PartitionTimeExtractor` 接口的自定义的提取器。

Review comment:
       delete these

##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -328,15 +328,30 @@ Time extractors define extracting time from partition 
values.
         <td>String</td>
         <td>The extractor class for implement PartitionTimeExtractor 
interface.</td>
     </tr>
+    <tr>
+        <td><h5>partition.time-extractor.formatter-pattern</h5></td>
+        <td style="word-wrap: break-word;">yyyy-MM-dd&nbsp;HH:mm:ss</td>
+        <td>String</td>
+        <td>When 'partition.time-extractor.kind' is set to 'default', you can 
specify a formatter pattern to get a timestamp from partitions. <br>By default, 
the format is 'yyyy-MM-dd&nbsp;HH:mm:ss'.
+            <br>Examples as follows:
+            <br>'yyyy-MM-dd' -> '2018-07-14'
+            <br>'dd-MMM-yyyy' -> '14-Jul-2018'
+            <br>'E, MMM dd yyyy' -> 'Sat, Jul 14 2018'
+            <br>More details: https://www.w3.org/QA/Tips/iso-date</td>

Review comment:
       The timestamp-formatter is compatible with Java's 
[DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
                                     

##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -328,15 +328,30 @@ Time extractors define extracting time from partition 
values.
         <td>String</td>
         <td>The extractor class for implement PartitionTimeExtractor 
interface.</td>
     </tr>
+    <tr>
+        <td><h5>partition.time-extractor.formatter-pattern</h5></td>
+        <td style="word-wrap: break-word;">yyyy-MM-dd&nbsp;HH:mm:ss</td>
+        <td>String</td>
+        <td>When 'partition.time-extractor.kind' is set to 'default', you can 
specify a formatter pattern to get a timestamp from partitions. <br>By default, 
the format is 'yyyy-MM-dd&nbsp;HH:mm:ss'.

Review comment:
       how about the name `timestamp-formatter` which has default value 
'yyyy-MM-dd&nbsp;HH:mm:ss', and it can combine with 
`partition.time-extractor.timestamp-patter`, and it also should supports 
multiple partition fields like ```$year$month$day```.
   
   

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
##########
@@ -50,84 +52,151 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
 
   protected var resultPath: String = _
 
-  private val data = Seq(
-    Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "7"),
-    Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "8"),
-    Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "9"),
+  // iso date
+  private val data: Seq[Row] = Seq(
+    Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "07"),
+    Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "08"),
+    Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "09"),
     Row.of(Integer.valueOf(4), "x", "y", "2020-05-03", "10"),
     Row.of(Integer.valueOf(5), "x", "y", "2020-05-03", "11"))
 
+  // basic iso date
+  private val data2 = Seq(
+    Row.of(Integer.valueOf(1), "a", "b", "20200503", "07"),
+    Row.of(Integer.valueOf(2), "p", "q", "20200503", "08"),
+    Row.of(Integer.valueOf(3), "x", "y", "20200503", "09"),
+    Row.of(Integer.valueOf(4), "x", "y", "20200504", "10"),
+    Row.of(Integer.valueOf(5), "x", "y", "20200504", "11"))
+
   @Before
   override def before(): Unit = {
     super.before()
-    resultPath = tempFolder.newFolder().toURI.toString
 
     env.setParallelism(1)
     env.enableCheckpointing(100)
-
-    val stream = new DataStream(env.getJavaEnv.addSource(
-      new FiniteTestSource(data),
-      new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
-
-    tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"), 
$("d"), $("e"))
+    
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   }
 
   def additionalProperties(): Array[String] = Array()
 
   @Test
   def testNonPart(): Unit = {
-    test(partition = false)
+    testIsoDate(partition = false)
   }
 
   @Test
   def testPart(): Unit = {
-    test(partition = true)
+    testIsoDate(partition = true)
     val basePath = new File(new URI(resultPath).getPath, "d=2020-05-03")
     Assert.assertEquals(5, basePath.list().length)
-    Assert.assertTrue(new File(new File(basePath, "e=7"), 
"_MY_SUCCESS").exists())
-    Assert.assertTrue(new File(new File(basePath, "e=8"), 
"_MY_SUCCESS").exists())
-    Assert.assertTrue(new File(new File(basePath, "e=9"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=07"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=08"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=09"), 
"_MY_SUCCESS").exists())
     Assert.assertTrue(new File(new File(basePath, "e=10"), 
"_MY_SUCCESS").exists())
     Assert.assertTrue(new File(new File(basePath, "e=11"), 
"_MY_SUCCESS").exists())
   }
 
-  private def test(partition: Boolean, policy: String = "success-file"): Unit 
= {
-    val dollar = '$'
-    val ddl = s"""
-                 |create table sink_table (
-                 |  a int,
-                 |  b string,
-                 |  c string,
-                 |  d string,
-                 |  e string
-                 |)
-                 |${if (partition) "partitioned by (d, e)" else ""}
-                 |with (
-                 |  'connector' = 'filesystem',
-                 |  'path' = '$resultPath',
-                 |  '${PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()}' =
-                 |      '${dollar}d ${dollar}e:00:00',
-                 |  '${SINK_PARTITION_COMMIT_DELAY.key()}' = '1h',
-                 |  '${SINK_PARTITION_COMMIT_POLICY_KIND.key()}' = '$policy',
-                 |  '${SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key()}' = 
'_MY_SUCCESS',
-                 |  ${additionalProperties().mkString(",\n")}
-                 |)
-       """.stripMargin
-    tEnv.executeSql(ddl)
-
-    tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await()
-
-    check("select * from sink_table", data)
-  }
-
   @Test
   def testMetastorePolicy(): Unit = {
     thrown.expectMessage(
       "Can not configure a 'metastore' partition commit policy for a file 
system table." +
-          " You can only configure 'metastore' partition commit policy for a 
hive table.")
-    test(partition = true, "metastore")
+        " You can only configure 'metastore' partition commit policy for a 
hive table.")
+    testIsoDate(partition = true, "metastore")
+  }
+  
+  @Test
+  def testBasicDate(): Unit = {
+
+    // create source test data stream
+    val fun = (t: Row) => {
+      val localDateTime = LocalDateTime.of(
+        LocalDate.parse(t.getFieldAs[String](3), 
DateTimeFormatter.BASIC_ISO_DATE),
+        LocalTime.MIDNIGHT)
+      TimestampData.fromLocalDateTime(localDateTime).getMillisecond
+    }
+
+    val stream: DataStream[Row] = new DataStream(env.getJavaEnv.addSource(
+      new FiniteTestSource(data2, fun),
+      new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
+
+    // write out the data
+    test(stream, "default", "yyyyMMdd", "$d", "d", "partition-time", "1d", 
data2)
+
+    // verify that the written data is correct
+    val basePath = new File(new URI(resultPath).getPath)
+    Assert.assertEquals(2, basePath.list().length)
+    Assert.assertTrue(new File(new File(basePath, "d=20200503"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "d=20200504"), 
"_MY_SUCCESS").exists())
+  }
+
+  def testIsoDate(partition: Boolean, policy: String = "success-file"): Unit = 
{

Review comment:
       how about `testPartitionCustomFormatDate` ?

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
##########
@@ -139,7 +208,8 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
   }
 }
 
-class FiniteTestSource(elements: Iterable[Row]) extends SourceFunction[Row] 
with CheckpointListener{
+class FiniteTestSource(elements: Iterable[Row], fun: Row => Long) extends 
SourceFunction[Row]

Review comment:
       hint: a meaningful name like `fun` -> `watermarkGenerator`

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
##########
@@ -50,84 +52,151 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
 
   protected var resultPath: String = _
 
-  private val data = Seq(
-    Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "7"),
-    Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "8"),
-    Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "9"),
+  // iso date
+  private val data: Seq[Row] = Seq(
+    Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "07"),
+    Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "08"),
+    Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "09"),
     Row.of(Integer.valueOf(4), "x", "y", "2020-05-03", "10"),
     Row.of(Integer.valueOf(5), "x", "y", "2020-05-03", "11"))
 
+  // basic iso date
+  private val data2 = Seq(
+    Row.of(Integer.valueOf(1), "a", "b", "20200503", "07"),
+    Row.of(Integer.valueOf(2), "p", "q", "20200503", "08"),
+    Row.of(Integer.valueOf(3), "x", "y", "20200503", "09"),
+    Row.of(Integer.valueOf(4), "x", "y", "20200504", "10"),
+    Row.of(Integer.valueOf(5), "x", "y", "20200504", "11"))
+
   @Before
   override def before(): Unit = {
     super.before()
-    resultPath = tempFolder.newFolder().toURI.toString
 
     env.setParallelism(1)
     env.enableCheckpointing(100)
-
-    val stream = new DataStream(env.getJavaEnv.addSource(
-      new FiniteTestSource(data),
-      new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
-
-    tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"), 
$("d"), $("e"))
+    
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   }
 
   def additionalProperties(): Array[String] = Array()
 
   @Test
   def testNonPart(): Unit = {
-    test(partition = false)
+    testIsoDate(partition = false)
   }
 
   @Test
   def testPart(): Unit = {
-    test(partition = true)
+    testIsoDate(partition = true)
     val basePath = new File(new URI(resultPath).getPath, "d=2020-05-03")
     Assert.assertEquals(5, basePath.list().length)
-    Assert.assertTrue(new File(new File(basePath, "e=7"), 
"_MY_SUCCESS").exists())
-    Assert.assertTrue(new File(new File(basePath, "e=8"), 
"_MY_SUCCESS").exists())
-    Assert.assertTrue(new File(new File(basePath, "e=9"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=07"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=08"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=09"), 
"_MY_SUCCESS").exists())
     Assert.assertTrue(new File(new File(basePath, "e=10"), 
"_MY_SUCCESS").exists())
     Assert.assertTrue(new File(new File(basePath, "e=11"), 
"_MY_SUCCESS").exists())
   }
 
-  private def test(partition: Boolean, policy: String = "success-file"): Unit 
= {
-    val dollar = '$'
-    val ddl = s"""
-                 |create table sink_table (
-                 |  a int,
-                 |  b string,
-                 |  c string,
-                 |  d string,
-                 |  e string
-                 |)
-                 |${if (partition) "partitioned by (d, e)" else ""}
-                 |with (
-                 |  'connector' = 'filesystem',
-                 |  'path' = '$resultPath',
-                 |  '${PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()}' =
-                 |      '${dollar}d ${dollar}e:00:00',
-                 |  '${SINK_PARTITION_COMMIT_DELAY.key()}' = '1h',
-                 |  '${SINK_PARTITION_COMMIT_POLICY_KIND.key()}' = '$policy',
-                 |  '${SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key()}' = 
'_MY_SUCCESS',
-                 |  ${additionalProperties().mkString(",\n")}
-                 |)
-       """.stripMargin
-    tEnv.executeSql(ddl)
-
-    tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await()
-
-    check("select * from sink_table", data)
-  }
-
   @Test
   def testMetastorePolicy(): Unit = {
     thrown.expectMessage(
       "Can not configure a 'metastore' partition commit policy for a file 
system table." +
-          " You can only configure 'metastore' partition commit policy for a 
hive table.")
-    test(partition = true, "metastore")
+        " You can only configure 'metastore' partition commit policy for a 
hive table.")
+    testIsoDate(partition = true, "metastore")
+  }
+  
+  @Test
+  def testBasicDate(): Unit = {

Review comment:
       how about `testPartitionWithBasicDate` ?

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
##########
@@ -50,84 +52,151 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
 
   protected var resultPath: String = _
 
-  private val data = Seq(
-    Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "7"),
-    Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "8"),
-    Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "9"),
+  // iso date
+  private val data: Seq[Row] = Seq(
+    Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "07"),
+    Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "08"),
+    Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "09"),
     Row.of(Integer.valueOf(4), "x", "y", "2020-05-03", "10"),
     Row.of(Integer.valueOf(5), "x", "y", "2020-05-03", "11"))
 
+  // basic iso date
+  private val data2 = Seq(
+    Row.of(Integer.valueOf(1), "a", "b", "20200503", "07"),
+    Row.of(Integer.valueOf(2), "p", "q", "20200503", "08"),
+    Row.of(Integer.valueOf(3), "x", "y", "20200503", "09"),
+    Row.of(Integer.valueOf(4), "x", "y", "20200504", "10"),
+    Row.of(Integer.valueOf(5), "x", "y", "20200504", "11"))
+
   @Before
   override def before(): Unit = {
     super.before()
-    resultPath = tempFolder.newFolder().toURI.toString
 
     env.setParallelism(1)
     env.enableCheckpointing(100)
-
-    val stream = new DataStream(env.getJavaEnv.addSource(
-      new FiniteTestSource(data),
-      new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
-
-    tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"), 
$("d"), $("e"))
+    
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   }
 
   def additionalProperties(): Array[String] = Array()
 
   @Test
   def testNonPart(): Unit = {
-    test(partition = false)
+    testIsoDate(partition = false)
   }
 
   @Test
   def testPart(): Unit = {
-    test(partition = true)
+    testIsoDate(partition = true)
     val basePath = new File(new URI(resultPath).getPath, "d=2020-05-03")
     Assert.assertEquals(5, basePath.list().length)
-    Assert.assertTrue(new File(new File(basePath, "e=7"), 
"_MY_SUCCESS").exists())
-    Assert.assertTrue(new File(new File(basePath, "e=8"), 
"_MY_SUCCESS").exists())
-    Assert.assertTrue(new File(new File(basePath, "e=9"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=07"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=08"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "e=09"), 
"_MY_SUCCESS").exists())
     Assert.assertTrue(new File(new File(basePath, "e=10"), 
"_MY_SUCCESS").exists())
     Assert.assertTrue(new File(new File(basePath, "e=11"), 
"_MY_SUCCESS").exists())
   }
 
-  private def test(partition: Boolean, policy: String = "success-file"): Unit 
= {
-    val dollar = '$'
-    val ddl = s"""
-                 |create table sink_table (
-                 |  a int,
-                 |  b string,
-                 |  c string,
-                 |  d string,
-                 |  e string
-                 |)
-                 |${if (partition) "partitioned by (d, e)" else ""}
-                 |with (
-                 |  'connector' = 'filesystem',
-                 |  'path' = '$resultPath',
-                 |  '${PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()}' =
-                 |      '${dollar}d ${dollar}e:00:00',
-                 |  '${SINK_PARTITION_COMMIT_DELAY.key()}' = '1h',
-                 |  '${SINK_PARTITION_COMMIT_POLICY_KIND.key()}' = '$policy',
-                 |  '${SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key()}' = 
'_MY_SUCCESS',
-                 |  ${additionalProperties().mkString(",\n")}
-                 |)
-       """.stripMargin
-    tEnv.executeSql(ddl)
-
-    tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await()
-
-    check("select * from sink_table", data)
-  }
-
   @Test
   def testMetastorePolicy(): Unit = {
     thrown.expectMessage(
       "Can not configure a 'metastore' partition commit policy for a file 
system table." +
-          " You can only configure 'metastore' partition commit policy for a 
hive table.")
-    test(partition = true, "metastore")
+        " You can only configure 'metastore' partition commit policy for a 
hive table.")
+    testIsoDate(partition = true, "metastore")
+  }
+  
+  @Test
+  def testBasicDate(): Unit = {
+
+    // create source test data stream
+    val fun = (t: Row) => {
+      val localDateTime = LocalDateTime.of(
+        LocalDate.parse(t.getFieldAs[String](3), 
DateTimeFormatter.BASIC_ISO_DATE),
+        LocalTime.MIDNIGHT)
+      TimestampData.fromLocalDateTime(localDateTime).getMillisecond
+    }
+
+    val stream: DataStream[Row] = new DataStream(env.getJavaEnv.addSource(
+      new FiniteTestSource(data2, fun),
+      new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
+
+    // write out the data
+    test(stream, "default", "yyyyMMdd", "$d", "d", "partition-time", "1d", 
data2)
+
+    // verify that the written data is correct
+    val basePath = new File(new URI(resultPath).getPath)
+    Assert.assertEquals(2, basePath.list().length)
+    Assert.assertTrue(new File(new File(basePath, "d=20200503"), 
"_MY_SUCCESS").exists())
+    Assert.assertTrue(new File(new File(basePath, "d=20200504"), 
"_MY_SUCCESS").exists())
+  }
+
+  def testIsoDate(partition: Boolean, policy: String = "success-file"): Unit = 
{
+
+    val fun = (t: Row) => {
+      val localDateTime = LocalDateTime.parse(s"${t.getField(3)} 
${t.getField(4)}:00:00",
+        DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))

Review comment:
       Give a non-basic format  String to cover what you change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to