leonardBang commented on a change in pull request #17749:
URL: https://github.com/apache/flink/pull/17749#discussion_r754767968
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
##########
@@ -50,84 +52,151 @@ abstract class FsStreamingSinkITCaseBase extends
StreamingTestBase {
protected var resultPath: String = _
- private val data = Seq(
- Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "7"),
- Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "8"),
- Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "9"),
- Row.of(Integer.valueOf(4), "x", "y", "2020-05-03", "10"),
- Row.of(Integer.valueOf(5), "x", "y", "2020-05-03", "11"))
+ // iso date
+ private val data: Seq[Row] = Seq(
+ Row.of(Integer.valueOf(1), "a", "b", "05-03-2020", "07"),
+ Row.of(Integer.valueOf(2), "p", "q", "05-03-2020", "08"),
+ Row.of(Integer.valueOf(3), "x", "y", "05-03-2020", "09"),
+ Row.of(Integer.valueOf(4), "x", "y", "05-03-2020", "10"),
+ Row.of(Integer.valueOf(5), "x", "y", "05-03-2020", "11"))
+
+ // basic iso date
+ private val data2 = Seq(
+ Row.of(Integer.valueOf(1), "a", "b", "20200503", "07"),
+ Row.of(Integer.valueOf(2), "p", "q", "20200503", "08"),
+ Row.of(Integer.valueOf(3), "x", "y", "20200503", "09"),
+ Row.of(Integer.valueOf(4), "x", "y", "20200504", "10"),
+ Row.of(Integer.valueOf(5), "x", "y", "20200504", "11"))
@Before
override def before(): Unit = {
super.before()
- resultPath = tempFolder.newFolder().toURI.toString
env.setParallelism(1)
env.enableCheckpointing(100)
-
- val stream = new DataStream(env.getJavaEnv.addSource(
- new FiniteTestSource(data),
- new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING,
Types.STRING)))
-
- tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"),
$("d"), $("e"))
+
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
}
def additionalProperties(): Array[String] = Array()
@Test
def testNonPart(): Unit = {
- test(partition = false)
+ testPartitionCustomFormatDate(partition = false)
}
@Test
def testPart(): Unit = {
- test(partition = true)
- val basePath = new File(new URI(resultPath).getPath, "d=2020-05-03")
+ testPartitionCustomFormatDate(partition = true)
+ val basePath = new File(new URI(resultPath).getPath, "d=05-03-2020")
Assert.assertEquals(5, basePath.list().length)
- Assert.assertTrue(new File(new File(basePath, "e=7"),
"_MY_SUCCESS").exists())
- Assert.assertTrue(new File(new File(basePath, "e=8"),
"_MY_SUCCESS").exists())
- Assert.assertTrue(new File(new File(basePath, "e=9"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=07"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=08"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=09"),
"_MY_SUCCESS").exists())
Assert.assertTrue(new File(new File(basePath, "e=10"),
"_MY_SUCCESS").exists())
Assert.assertTrue(new File(new File(basePath, "e=11"),
"_MY_SUCCESS").exists())
}
- private def test(partition: Boolean, policy: String = "success-file"): Unit
= {
- val dollar = '$'
- val ddl = s"""
- |create table sink_table (
- | a int,
- | b string,
- | c string,
- | d string,
- | e string
- |)
- |${if (partition) "partitioned by (d, e)" else ""}
- |with (
- | 'connector' = 'filesystem',
- | 'path' = '$resultPath',
- | '${PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()}' =
- | '${dollar}d ${dollar}e:00:00',
- | '${SINK_PARTITION_COMMIT_DELAY.key()}' = '1h',
- | '${SINK_PARTITION_COMMIT_POLICY_KIND.key()}' = '$policy',
- | '${SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key()}' =
'_MY_SUCCESS',
- | ${additionalProperties().mkString(",\n")}
- |)
- """.stripMargin
- tEnv.executeSql(ddl)
-
- tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await()
-
- check("select * from sink_table", data)
- }
-
@Test
def testMetastorePolicy(): Unit = {
thrown.expectMessage(
"Can not configure a 'metastore' partition commit policy for a file
system table." +
- " You can only configure 'metastore' partition commit policy for a
hive table.")
- test(partition = true, "metastore")
+ " You can only configure 'metastore' partition commit policy for a
hive table.")
+ testPartitionCustomFormatDate(partition = true, "metastore")
}
+
+ @Test
+ def testPartitionWithBasicDate(): Unit = {
+
+ // create source test data stream
+ val fun = (t: Row) => {
+ val localDateTime = LocalDateTime.of(
+ LocalDate.parse(t.getFieldAs[String](3),
DateTimeFormatter.BASIC_ISO_DATE),
+ LocalTime.MIDNIGHT)
+ TimestampData.fromLocalDateTime(localDateTime).getMillisecond
+ }
+
+ val stream: DataStream[Row] = new DataStream(env.getJavaEnv.addSource(
+ new FiniteTestSource(data2, fun),
+ new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING,
Types.STRING)))
+
+ // write out the data
+ test(stream, "default", "yyyyMMdd", "$d", "d", "partition-time", "1d",
data2)
+ // verify that the written data is correct
+ val basePath = new File(new URI(resultPath).getPath)
+ Assert.assertEquals(2, basePath.list().length)
+ Assert.assertTrue(new File(new File(basePath, "d=20200503"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "d=20200504"),
"_MY_SUCCESS").exists())
+ }
+
+ def testPartitionCustomFormatDate(partition: Boolean, policy: String =
"success-file"): Unit = {
+
+ val fun = (t: Row) => {
+ val localDateTime = LocalDateTime.parse(s"${t.getField(3)}
${t.getField(4)}:00:00",
+ DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss"))
+ TimestampData.fromLocalDateTime(localDateTime).getMillisecond
+ }
+
+ val stream = new DataStream(env.getJavaEnv.addSource(
+ new FiniteTestSource(data, fun),
+ new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING,
Types.STRING)))
+
+ test(stream, "default", "", "$d $e:00:00", if (partition) "d,e" else "",
Review comment:
IIUC, we should pass a `timeExtractorFormatterPattern` to check what you
change here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]