twalthr commented on a change in pull request #17776:
URL: https://github.com/apache/flink/pull/17776#discussion_r751387893



##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -122,22 +122,22 @@ The following connector metadata can be accessed as 
metadata columns in a table
     </thead>
     <tbody>
     <tr>
-      <td><code>filepath</code></td>
+      <td><code>file.path</code></td>
       <td><code>STRING NOT NULL</code></td>
       <td>Full path of the input file.</td>
     </tr>
     <tr>
-      <td><code>filename</code></td>
+      <td><code>file.name</code></td>
       <td><code>STRING NOT NULL</code></td>
       <td>Name of the file, that is the farthest element from the root of the 
filepath.</td>
     </tr>
     <tr>
-      <td><code>size</code></td>
+      <td><code>file.size</code></td>
       <td><code>BIGINT NOT NULL</code></td>
       <td>Byte count of the file.</td>
     </tr>
     <tr>
-      <td><code>modification_time</code></td>
+      <td><code>file.modification_time</code></td>

Review comment:
       `file.modification-time`

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
##########
@@ -368,6 +372,63 @@ trait FileSystemITCaseBase {
     )
   }
 
+  @Test
+  def testReadAllMetadata(): Unit = {
+    if (!supportsReadingMetadata) {
+      return
+    }
+
+    tableEnv.executeSql(
+      s"""
+         |create table metadataTable (
+         |  x string,
+         |  filepath string metadata,
+         |  filename string metadata,
+         |  size bigint metadata,
+         |  modification_time timestamp_ltz metadata
+         |) with (
+         |  'connector' = 'filesystem',
+         |  'path' = '$resultPath',
+         |  ${formatProperties().mkString(",\n")}
+         |)
+         """.stripMargin
+    )
+
+    tableEnv.executeSql(
+      "insert into nonPartitionedTable (x) select x from originalT limit 
1").await()
+
+    checkPredicate(
+      "select * from metadataTable",
+      row => {
+        assertEquals(5, row.getArity)
+
+        // Only one file, because we don't have partitions
+        val file = new File(URI.create(resultPath).getPath).listFiles()(0)
+
+        assertEquals(
+          file.getPath,
+          row.getFieldAs[String](1)
+        )
+        assertEquals(
+          Paths.get(file.toURI).getFileName.toString,
+          row.getFieldAs[String](2)
+        )
+        assertEquals(
+          file.length(),
+          row.getFieldAs[Long](3)
+        )
+        assertEquals(
+          // Note: It's TIMESTAMP_LTZ
+          Instant.ofEpochMilli(file.lastModified())
+            .atZone(DateTimeUtils.UTC_ZONE.toZoneId)

Review comment:
       Is this documented somewhere? Or can this be different per filesystem?
   
   Because I see this in `FileStatus`:
   ```
       /**
        * Get the modification time of the file.
        *
        * @return the modification time of file in milliseconds since January 
1, 1970 UTC.
        */
       long getModificationTime();
   ```
   
   So UTC?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to