twalthr commented on a change in pull request #17776:
URL: https://github.com/apache/flink/pull/17776#discussion_r751256417



##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
##########
@@ -368,6 +372,63 @@ trait FileSystemITCaseBase {
     )
   }
 
+  @Test
+  def testReadAllMetadata(): Unit = {
+    if (!supportsReadingMetadata) {
+      return
+    }
+
+    tableEnv.executeSql(
+      s"""
+         |create table metadataTable (
+         |  x string,
+         |  filepath string metadata,
+         |  filename string metadata,
+         |  size bigint metadata,
+         |  modification_time timestamp_ltz metadata

Review comment:
       specify the precision 3, otherwise implicit casts might be added

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
##########
@@ -368,6 +372,63 @@ trait FileSystemITCaseBase {
     )
   }
 
+  @Test
+  def testReadAllMetadata(): Unit = {
+    if (!supportsReadingMetadata) {
+      return
+    }
+
+    tableEnv.executeSql(
+      s"""
+         |create table metadataTable (
+         |  x string,
+         |  filepath string metadata,
+         |  filename string metadata,
+         |  size bigint metadata,
+         |  modification_time timestamp_ltz metadata
+         |) with (
+         |  'connector' = 'filesystem',
+         |  'path' = '$resultPath',
+         |  ${formatProperties().mkString(",\n")}
+         |)
+         """.stripMargin
+    )
+
+    tableEnv.executeSql(
+      "insert into nonPartitionedTable (x) select x from originalT limit 
1").await()
+
+    checkPredicate(
+      "select * from metadataTable",
+      row => {
+        assertEquals(5, row.getArity)
+
+        // Only one file, because we don't have partitions
+        val file = new File(URI.create(resultPath).getPath).listFiles()(0)
+
+        assertEquals(
+          file.getPath,

Review comment:
       is this safe across platforms like Windows? or shall we simply just 
check that the filename is contained.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
##########
@@ -368,6 +372,63 @@ trait FileSystemITCaseBase {
     )
   }
 
+  @Test
+  def testReadAllMetadata(): Unit = {
+    if (!supportsReadingMetadata) {
+      return
+    }
+
+    tableEnv.executeSql(
+      s"""
+         |create table metadataTable (

Review comment:
       btw for newly added SQL statement. can we agree on upper case SQL 
keywords like:
   ```
   CREATE TABLE metadataTable (x STRING) ...
   ```
   
   Make it easier to read.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java
##########
@@ -67,6 +68,12 @@
     /** The number of bytes in the file to process. */
     private final long length;
 
+    /** The modification time of the file, from {@link 
FileStatus#getModificationTime()}. */
+    private final long modificationTime;
+
+    /** The modification time of the file, from {@link FileStatus#getLen()}. */

Review comment:
       copy paste error

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java
##########
@@ -67,6 +68,12 @@
     /** The number of bytes in the file to process. */
     private final long length;
 
+    /** The modification time of the file, from {@link 
FileStatus#getModificationTime()}. */
+    private final long modificationTime;

Review comment:
       prefix with `file` as well if it is not related to the split?

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -440,6 +445,44 @@ public void applyReadableMetadata(List<String> 
metadataKeys, DataType producedDa
                     public Object getValue(FileSourceSplit split) {
                         return StringData.fromString(split.path().getPath());
                     }
+                }),
+        FILENAME(
+                "filename",
+                DataTypes.STRING().notNull(),
+                new FileInfoAccessor() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object getValue(FileSourceSplit split) {
+                        return StringData.fromString(
+                                
Paths.get(split.path().getPath()).getFileName().toString());
+                    }
+                }),
+        SIZE(
+                "size",
+                DataTypes.BIGINT().notNull(),
+                new FileInfoAccessor() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object getValue(FileSourceSplit split) {
+                        return split.fileSize();
+                    }
+                }),
+        MODIFICATION_TIME(
+                "modification_time",
+                DataTypes.TIMESTAMP_LTZ(3).notNull(),
+                new FileInfoAccessor() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object getValue(FileSourceSplit split) {
+                        long nanos = split.modificationTime();
+                        return TimestampData.fromLocalDateTime(

Review comment:
       `TimestampData.fromInstant`?

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
##########
@@ -368,6 +372,63 @@ trait FileSystemITCaseBase {
     )
   }
 
+  @Test
+  def testReadAllMetadata(): Unit = {
+    if (!supportsReadingMetadata) {
+      return
+    }
+
+    tableEnv.executeSql(
+      s"""
+         |create table metadataTable (
+         |  x string,
+         |  filepath string metadata,
+         |  filename string metadata,
+         |  size bigint metadata,
+         |  modification_time timestamp_ltz metadata
+         |) with (
+         |  'connector' = 'filesystem',
+         |  'path' = '$resultPath',
+         |  ${formatProperties().mkString(",\n")}
+         |)
+         """.stripMargin
+    )
+
+    tableEnv.executeSql(
+      "insert into nonPartitionedTable (x) select x from originalT limit 
1").await()
+
+    checkPredicate(
+      "select * from metadataTable",
+      row => {
+        assertEquals(5, row.getArity)
+
+        // Only one file, because we don't have partitions
+        val file = new File(URI.create(resultPath).getPath).listFiles()(0)
+
+        assertEquals(
+          file.getPath,
+          row.getFieldAs[String](1)
+        )
+        assertEquals(
+          Paths.get(file.toURI).getFileName.toString,
+          row.getFieldAs[String](2)
+        )
+        assertEquals(
+          file.length(),
+          row.getFieldAs[Long](3)
+        )
+        assertEquals(
+          // Note: It's TIMESTAMP_LTZ
+          Instant.ofEpochMilli(file.lastModified())
+            .atZone(DateTimeUtils.UTC_ZONE.toZoneId)

Review comment:
       why is `Instant.ofEpochMilli(file.lastModified())` not enough?

##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -126,6 +126,21 @@ The following connector metadata can be accessed as 
metadata columns in a table
       <td><code>STRING NOT NULL</code></td>
       <td>Full path of the input file.</td>
     </tr>
+    <tr>
+      <td><code>filename</code></td>
+      <td><code>STRING NOT NULL</code></td>
+      <td>Name of the file, that is the farthest element from the root of the 
filepath.</td>
+    </tr>
+    <tr>
+      <td><code>size</code></td>

Review comment:
       `filename` vs `size` sounds inconsistent. Shall we call it `file.name`, 
`file.size`, `file.modification-time`? This would be more consistent with 
existing metadata and other option keys.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to