leonardBang commented on a change in pull request #17749:
URL: https://github.com/apache/flink/pull/17749#discussion_r754021984
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
##########
@@ -297,7 +300,22 @@ public HiveContinuousPartitionFetcherContext(
if
(configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET)) {
String consumeOffsetStr =
configuration.getString(STREAMING_SOURCE_CONSUME_START_OFFSET);
- consumeStartOffset = (T)
Long.valueOf(toMills(consumeOffsetStr));
+
+ LocalDateTime localDateTime =
+
configuration.getString(PARTITION_TIME_EXTRACTOR_FORMATTER_PATTEN)
+ == null
+ ?
DefaultPartTimeExtractor.toLocalDateTimeDefault(
+ consumeOffsetStr)
+ :
DefaultPartTimeExtractor.toLocalDateTime(
+ consumeOffsetStr,
+ configuration.getString(
+
PARTITION_TIME_EXTRACTOR_FORMATTER_PATTEN));
+
Review comment:
`DefaultPartTimeExtractor.toLocalDateTimeDefault()` and
`DefaultPartTimeExtractor.toLocalDateTime` can unify to one method
`DefaultPartTimeExtractor.toLocalDateTime(String timestampString, @Nullable
String formatterPattern)`
##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -328,15 +328,30 @@ Time extractors define extracting time from partition
values.
<td>String</td>
<td>The extractor class for implement PartitionTimeExtractor
interface.</td>
</tr>
+ <tr>
+ <td><h5>partition.time-extractor.formatter-pattern</h5></td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd HH:mm:ss</td>
+ <td>String</td>
+ <td>When 'partition.time-extractor.kind' is set to 'default', you can
specify a formatter pattern to get a timestamp from partitions. <br>By default,
the format is 'yyyy-MM-dd HH:mm:ss'.
+ <br>Examples as follows:
+ <br>'yyyy-MM-dd' -> '2018-07-14'
+ <br>'dd-MMM-yyyy' -> '14-Jul-2018'
+ <br>'E, MMM dd yyyy' -> 'Sat, Jul 14 2018'
+ <br>More details: https://www.w3.org/QA/Tips/iso-date</td>
+ </tr>
<tr>
<td><h5>partition.time-extractor.timestamp-pattern</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>The 'default' construction way allows users to use partition
fields to get a legal timestamp pattern. Default support 'yyyy-mm-dd hh:mm:ss'
from first field. If timestamp should be extracted from a single partition
field 'dt', can configure: '$dt'. If timestamp should be extracted from
multiple partition fields, say 'year', 'month', 'day' and 'hour', can
configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted
from two partition fields 'dt' and 'hour', can configure: '$dt
$hour:00:00'.</td>
+ <td>The 'default' construction way allows users to use partition
fields to get a legal timestamp pattern. Default support 'yyyy-MM-dd HH:mm:ss'
from first field. If timestamp should be extracted from a single partition
field 'dt', can configure: '$dt'. If timestamp should be extracted from
multiple partition fields, say 'year', 'month', 'day' and 'hour', can
configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted
from two partition fields 'dt' and 'hour', can configure: '$dt
$hour:00:00'.</td>
</tr>
</tbody>
</table>
+
+
+
+
Review comment:
delete these
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
##########
@@ -167,7 +169,11 @@ public void open() throws Exception {
Short.MAX_VALUE);
for (String partitionName : partitionNames) {
List<String> partValues =
extractPartitionValues(partitionName);
- Long partitionTime =
toMills(extractor.extract(partitionKeys, partValues));
+
+ Long partitionTime =
+ TimestampData.fromLocalDateTime(
+ extractor.extract(partitionKeys,
partValues))
+ .getMillisecond();
Review comment:
I think original is more brief ?
##########
File path: docs/content.zh/docs/connectors/table/filesystem.md
##########
@@ -276,18 +276,31 @@ file sink 支持文件合并,以允许应用程序可以使用较小的检查
<td>String</td>
<td>实现了接口 PartitionTimeExtractor 的提取器类.</td>
</tr>
+ <tr>
+ <td><h5>partition.time-extractor.formatter-pattern</h5></td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd HH:mm:ss</td>
+ <td>String</td>
+
<td>指定时间格式化的类型。当'partition.time-extractor.kind'设置为'default'时,你可以指定分区字段的时间格式化类型。
<br>它的默认格式化类型是:'yyyy-MM-dd HH:mm:ss'.
+ <br>示例如下:
+ <br>'yyyy-MM-dd' -> '2018-07-14'
+ <br>'dd-MMM-yyyy' -> '14-Jul-2018'
+ <br>'E, MMM dd yyyy' -> 'Sat, Jul 14 2018'
+ <br>更多细节可参考: https://www.w3.org/QA/Tips/iso-date</td>
+ </tr>
<tr>
<td><h5>partition.time-extractor.timestamp-pattern</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td> 'default' 时间提取器允许用户从分区字段中提取合法的时间戳模式。默认支持从第一个字段按 'yyyy-mm-dd
hh:mm:ss' 时间戳模式提取。
+ <td> 'default' 时间提取器允许用户从分区字段中提取合法的时间戳模式。默认支持从第一个字段按 'yyyy-MM-dd
HH:mm:ss' 时间戳模式提取。
如果需要从一个分区字段比如 ‘dt’ 提取时间戳,可以配置为: '$dt';
如果需要从多个分区字段,比如 'year', 'month', 'day' 和
'hour'提取时间戳,可以配置为:'$year-$month-$day $hour:00:00';
如果需要从两字分区字段,比如 'dt' 和 'hour' 提取时间戳,可以配置为:'$dt $hour:00:00'.</td>
</tr>
</tbody>
</table>
+
+
默认的提取器是基于由分区字段组合而成的时间戳模式。你也可以指定一个实现了 `PartitionTimeExtractor` 接口的自定义的提取器。
Review comment:
delete these
##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -328,15 +328,30 @@ Time extractors define extracting time from partition
values.
<td>String</td>
<td>The extractor class for implement PartitionTimeExtractor
interface.</td>
</tr>
+ <tr>
+ <td><h5>partition.time-extractor.formatter-pattern</h5></td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd HH:mm:ss</td>
+ <td>String</td>
+ <td>When 'partition.time-extractor.kind' is set to 'default', you can
specify a formatter pattern to get a timestamp from partitions. <br>By default,
the format is 'yyyy-MM-dd HH:mm:ss'.
+ <br>Examples as follows:
+ <br>'yyyy-MM-dd' -> '2018-07-14'
+ <br>'dd-MMM-yyyy' -> '14-Jul-2018'
+ <br>'E, MMM dd yyyy' -> 'Sat, Jul 14 2018'
+ <br>More details: https://www.w3.org/QA/Tips/iso-date</td>
Review comment:
The timestamp-formatter is compatible with Java's
[DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -328,15 +328,30 @@ Time extractors define extracting time from partition
values.
<td>String</td>
<td>The extractor class for implement PartitionTimeExtractor
interface.</td>
</tr>
+ <tr>
+ <td><h5>partition.time-extractor.formatter-pattern</h5></td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd HH:mm:ss</td>
+ <td>String</td>
+ <td>When 'partition.time-extractor.kind' is set to 'default', you can
specify a formatter pattern to get a timestamp from partitions. <br>By default,
the format is 'yyyy-MM-dd HH:mm:ss'.
Review comment:
how about the name `timestamp-formatter` which has default value
'yyyy-MM-dd HH:mm:ss', and it can combine with
`partition.time-extractor.timestamp-patter`, and it also should supports
multiple partition fields like ```$year$month$day```.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
##########
@@ -50,84 +52,151 @@ abstract class FsStreamingSinkITCaseBase extends
StreamingTestBase {
protected var resultPath: String = _
- private val data = Seq(
- Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "7"),
- Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "8"),
- Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "9"),
+ // iso date
+ private val data: Seq[Row] = Seq(
+ Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "07"),
+ Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "08"),
+ Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "09"),
Row.of(Integer.valueOf(4), "x", "y", "2020-05-03", "10"),
Row.of(Integer.valueOf(5), "x", "y", "2020-05-03", "11"))
+ // basic iso date
+ private val data2 = Seq(
+ Row.of(Integer.valueOf(1), "a", "b", "20200503", "07"),
+ Row.of(Integer.valueOf(2), "p", "q", "20200503", "08"),
+ Row.of(Integer.valueOf(3), "x", "y", "20200503", "09"),
+ Row.of(Integer.valueOf(4), "x", "y", "20200504", "10"),
+ Row.of(Integer.valueOf(5), "x", "y", "20200504", "11"))
+
@Before
override def before(): Unit = {
super.before()
- resultPath = tempFolder.newFolder().toURI.toString
env.setParallelism(1)
env.enableCheckpointing(100)
-
- val stream = new DataStream(env.getJavaEnv.addSource(
- new FiniteTestSource(data),
- new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING,
Types.STRING)))
-
- tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"),
$("d"), $("e"))
+
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
}
def additionalProperties(): Array[String] = Array()
@Test
def testNonPart(): Unit = {
- test(partition = false)
+ testIsoDate(partition = false)
}
@Test
def testPart(): Unit = {
- test(partition = true)
+ testIsoDate(partition = true)
val basePath = new File(new URI(resultPath).getPath, "d=2020-05-03")
Assert.assertEquals(5, basePath.list().length)
- Assert.assertTrue(new File(new File(basePath, "e=7"),
"_MY_SUCCESS").exists())
- Assert.assertTrue(new File(new File(basePath, "e=8"),
"_MY_SUCCESS").exists())
- Assert.assertTrue(new File(new File(basePath, "e=9"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=07"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=08"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=09"),
"_MY_SUCCESS").exists())
Assert.assertTrue(new File(new File(basePath, "e=10"),
"_MY_SUCCESS").exists())
Assert.assertTrue(new File(new File(basePath, "e=11"),
"_MY_SUCCESS").exists())
}
- private def test(partition: Boolean, policy: String = "success-file"): Unit
= {
- val dollar = '$'
- val ddl = s"""
- |create table sink_table (
- | a int,
- | b string,
- | c string,
- | d string,
- | e string
- |)
- |${if (partition) "partitioned by (d, e)" else ""}
- |with (
- | 'connector' = 'filesystem',
- | 'path' = '$resultPath',
- | '${PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()}' =
- | '${dollar}d ${dollar}e:00:00',
- | '${SINK_PARTITION_COMMIT_DELAY.key()}' = '1h',
- | '${SINK_PARTITION_COMMIT_POLICY_KIND.key()}' = '$policy',
- | '${SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key()}' =
'_MY_SUCCESS',
- | ${additionalProperties().mkString(",\n")}
- |)
- """.stripMargin
- tEnv.executeSql(ddl)
-
- tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await()
-
- check("select * from sink_table", data)
- }
-
@Test
def testMetastorePolicy(): Unit = {
thrown.expectMessage(
"Can not configure a 'metastore' partition commit policy for a file
system table." +
- " You can only configure 'metastore' partition commit policy for a
hive table.")
- test(partition = true, "metastore")
+ " You can only configure 'metastore' partition commit policy for a
hive table.")
+ testIsoDate(partition = true, "metastore")
+ }
+
+ @Test
+ def testBasicDate(): Unit = {
+
+ // create source test data stream
+ val fun = (t: Row) => {
+ val localDateTime = LocalDateTime.of(
+ LocalDate.parse(t.getFieldAs[String](3),
DateTimeFormatter.BASIC_ISO_DATE),
+ LocalTime.MIDNIGHT)
+ TimestampData.fromLocalDateTime(localDateTime).getMillisecond
+ }
+
+ val stream: DataStream[Row] = new DataStream(env.getJavaEnv.addSource(
+ new FiniteTestSource(data2, fun),
+ new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING,
Types.STRING)))
+
+ // write out the data
+ test(stream, "default", "yyyyMMdd", "$d", "d", "partition-time", "1d",
data2)
+
+ // verify that the written data is correct
+ val basePath = new File(new URI(resultPath).getPath)
+ Assert.assertEquals(2, basePath.list().length)
+ Assert.assertTrue(new File(new File(basePath, "d=20200503"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "d=20200504"),
"_MY_SUCCESS").exists())
+ }
+
+ def testIsoDate(partition: Boolean, policy: String = "success-file"): Unit =
{
Review comment:
how about `testPartitionCustomFormatDate` ?
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
##########
@@ -139,7 +208,8 @@ abstract class FsStreamingSinkITCaseBase extends
StreamingTestBase {
}
}
-class FiniteTestSource(elements: Iterable[Row]) extends SourceFunction[Row]
with CheckpointListener{
+class FiniteTestSource(elements: Iterable[Row], fun: Row => Long) extends
SourceFunction[Row]
Review comment:
hint: a meaningful name like `fun` -> `watermarkGenerator`
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
##########
@@ -50,84 +52,151 @@ abstract class FsStreamingSinkITCaseBase extends
StreamingTestBase {
protected var resultPath: String = _
- private val data = Seq(
- Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "7"),
- Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "8"),
- Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "9"),
+ // iso date
+ private val data: Seq[Row] = Seq(
+ Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "07"),
+ Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "08"),
+ Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "09"),
Row.of(Integer.valueOf(4), "x", "y", "2020-05-03", "10"),
Row.of(Integer.valueOf(5), "x", "y", "2020-05-03", "11"))
+ // basic iso date
+ private val data2 = Seq(
+ Row.of(Integer.valueOf(1), "a", "b", "20200503", "07"),
+ Row.of(Integer.valueOf(2), "p", "q", "20200503", "08"),
+ Row.of(Integer.valueOf(3), "x", "y", "20200503", "09"),
+ Row.of(Integer.valueOf(4), "x", "y", "20200504", "10"),
+ Row.of(Integer.valueOf(5), "x", "y", "20200504", "11"))
+
@Before
override def before(): Unit = {
super.before()
- resultPath = tempFolder.newFolder().toURI.toString
env.setParallelism(1)
env.enableCheckpointing(100)
-
- val stream = new DataStream(env.getJavaEnv.addSource(
- new FiniteTestSource(data),
- new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING,
Types.STRING)))
-
- tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"),
$("d"), $("e"))
+
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
}
def additionalProperties(): Array[String] = Array()
@Test
def testNonPart(): Unit = {
- test(partition = false)
+ testIsoDate(partition = false)
}
@Test
def testPart(): Unit = {
- test(partition = true)
+ testIsoDate(partition = true)
val basePath = new File(new URI(resultPath).getPath, "d=2020-05-03")
Assert.assertEquals(5, basePath.list().length)
- Assert.assertTrue(new File(new File(basePath, "e=7"),
"_MY_SUCCESS").exists())
- Assert.assertTrue(new File(new File(basePath, "e=8"),
"_MY_SUCCESS").exists())
- Assert.assertTrue(new File(new File(basePath, "e=9"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=07"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=08"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=09"),
"_MY_SUCCESS").exists())
Assert.assertTrue(new File(new File(basePath, "e=10"),
"_MY_SUCCESS").exists())
Assert.assertTrue(new File(new File(basePath, "e=11"),
"_MY_SUCCESS").exists())
}
- private def test(partition: Boolean, policy: String = "success-file"): Unit
= {
- val dollar = '$'
- val ddl = s"""
- |create table sink_table (
- | a int,
- | b string,
- | c string,
- | d string,
- | e string
- |)
- |${if (partition) "partitioned by (d, e)" else ""}
- |with (
- | 'connector' = 'filesystem',
- | 'path' = '$resultPath',
- | '${PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()}' =
- | '${dollar}d ${dollar}e:00:00',
- | '${SINK_PARTITION_COMMIT_DELAY.key()}' = '1h',
- | '${SINK_PARTITION_COMMIT_POLICY_KIND.key()}' = '$policy',
- | '${SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key()}' =
'_MY_SUCCESS',
- | ${additionalProperties().mkString(",\n")}
- |)
- """.stripMargin
- tEnv.executeSql(ddl)
-
- tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await()
-
- check("select * from sink_table", data)
- }
-
@Test
def testMetastorePolicy(): Unit = {
thrown.expectMessage(
"Can not configure a 'metastore' partition commit policy for a file
system table." +
- " You can only configure 'metastore' partition commit policy for a
hive table.")
- test(partition = true, "metastore")
+ " You can only configure 'metastore' partition commit policy for a
hive table.")
+ testIsoDate(partition = true, "metastore")
+ }
+
+ @Test
+ def testBasicDate(): Unit = {
Review comment:
how about `testPartitionWithBasicDate` ?
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
##########
@@ -50,84 +52,151 @@ abstract class FsStreamingSinkITCaseBase extends
StreamingTestBase {
protected var resultPath: String = _
- private val data = Seq(
- Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "7"),
- Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "8"),
- Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "9"),
+ // iso date
+ private val data: Seq[Row] = Seq(
+ Row.of(Integer.valueOf(1), "a", "b", "2020-05-03", "07"),
+ Row.of(Integer.valueOf(2), "p", "q", "2020-05-03", "08"),
+ Row.of(Integer.valueOf(3), "x", "y", "2020-05-03", "09"),
Row.of(Integer.valueOf(4), "x", "y", "2020-05-03", "10"),
Row.of(Integer.valueOf(5), "x", "y", "2020-05-03", "11"))
+ // basic iso date
+ private val data2 = Seq(
+ Row.of(Integer.valueOf(1), "a", "b", "20200503", "07"),
+ Row.of(Integer.valueOf(2), "p", "q", "20200503", "08"),
+ Row.of(Integer.valueOf(3), "x", "y", "20200503", "09"),
+ Row.of(Integer.valueOf(4), "x", "y", "20200504", "10"),
+ Row.of(Integer.valueOf(5), "x", "y", "20200504", "11"))
+
@Before
override def before(): Unit = {
super.before()
- resultPath = tempFolder.newFolder().toURI.toString
env.setParallelism(1)
env.enableCheckpointing(100)
-
- val stream = new DataStream(env.getJavaEnv.addSource(
- new FiniteTestSource(data),
- new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING,
Types.STRING)))
-
- tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"),
$("d"), $("e"))
+
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
}
def additionalProperties(): Array[String] = Array()
@Test
def testNonPart(): Unit = {
- test(partition = false)
+ testIsoDate(partition = false)
}
@Test
def testPart(): Unit = {
- test(partition = true)
+ testIsoDate(partition = true)
val basePath = new File(new URI(resultPath).getPath, "d=2020-05-03")
Assert.assertEquals(5, basePath.list().length)
- Assert.assertTrue(new File(new File(basePath, "e=7"),
"_MY_SUCCESS").exists())
- Assert.assertTrue(new File(new File(basePath, "e=8"),
"_MY_SUCCESS").exists())
- Assert.assertTrue(new File(new File(basePath, "e=9"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=07"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=08"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "e=09"),
"_MY_SUCCESS").exists())
Assert.assertTrue(new File(new File(basePath, "e=10"),
"_MY_SUCCESS").exists())
Assert.assertTrue(new File(new File(basePath, "e=11"),
"_MY_SUCCESS").exists())
}
- private def test(partition: Boolean, policy: String = "success-file"): Unit
= {
- val dollar = '$'
- val ddl = s"""
- |create table sink_table (
- | a int,
- | b string,
- | c string,
- | d string,
- | e string
- |)
- |${if (partition) "partitioned by (d, e)" else ""}
- |with (
- | 'connector' = 'filesystem',
- | 'path' = '$resultPath',
- | '${PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()}' =
- | '${dollar}d ${dollar}e:00:00',
- | '${SINK_PARTITION_COMMIT_DELAY.key()}' = '1h',
- | '${SINK_PARTITION_COMMIT_POLICY_KIND.key()}' = '$policy',
- | '${SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key()}' =
'_MY_SUCCESS',
- | ${additionalProperties().mkString(",\n")}
- |)
- """.stripMargin
- tEnv.executeSql(ddl)
-
- tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await()
-
- check("select * from sink_table", data)
- }
-
@Test
def testMetastorePolicy(): Unit = {
thrown.expectMessage(
"Can not configure a 'metastore' partition commit policy for a file
system table." +
- " You can only configure 'metastore' partition commit policy for a
hive table.")
- test(partition = true, "metastore")
+ " You can only configure 'metastore' partition commit policy for a
hive table.")
+ testIsoDate(partition = true, "metastore")
+ }
+
+ @Test
+ def testBasicDate(): Unit = {
+
+ // create source test data stream
+ val fun = (t: Row) => {
+ val localDateTime = LocalDateTime.of(
+ LocalDate.parse(t.getFieldAs[String](3),
DateTimeFormatter.BASIC_ISO_DATE),
+ LocalTime.MIDNIGHT)
+ TimestampData.fromLocalDateTime(localDateTime).getMillisecond
+ }
+
+ val stream: DataStream[Row] = new DataStream(env.getJavaEnv.addSource(
+ new FiniteTestSource(data2, fun),
+ new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING,
Types.STRING)))
+
+ // write out the data
+ test(stream, "default", "yyyyMMdd", "$d", "d", "partition-time", "1d",
data2)
+
+ // verify that the written data is correct
+ val basePath = new File(new URI(resultPath).getPath)
+ Assert.assertEquals(2, basePath.list().length)
+ Assert.assertTrue(new File(new File(basePath, "d=20200503"),
"_MY_SUCCESS").exists())
+ Assert.assertTrue(new File(new File(basePath, "d=20200504"),
"_MY_SUCCESS").exists())
+ }
+
+ def testIsoDate(partition: Boolean, policy: String = "success-file"): Unit =
{
+
+ val fun = (t: Row) => {
+ val localDateTime = LocalDateTime.parse(s"${t.getField(3)}
${t.getField(4)}:00:00",
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
Review comment:
Give a non-basic format String to cover what you change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]