This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 0aaf3d2  [hotfix] Value field should be named in value count mode
0aaf3d2 is described below

commit 0aaf3d26275cc70a40c916cd7c8815313f1bdd00
Author: Jane Chan <[email protected]>
AuthorDate: Fri Apr 8 15:25:05 2022 +0800

    [hotfix] Value field should be named in value count mode
    
    This closes #79
---
 .../flink/table/store/connector/TableStore.java    |  5 +++-
 .../store/connector/ReadWriteTableITCase.java      | 27 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 88826f0..37315c4 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -171,7 +171,10 @@ public class TableStore {
         MergeFunction mergeFunction;
         if (primaryKeys.length == 0) {
             keyType = type;
-            valueType = RowType.of(new BigIntType(false));
+            valueType =
+                    RowType.of(
+                            new LogicalType[] {new BigIntType(false)},
+                            new String[] {"_VALUE_COUNT"});
             mergeFunction = new ValueCountMergeFunction();
         } else {
             List<RowType.RowField> fields = TypeUtils.project(type, 
primaryKeys).getFields();
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 4596dec..bff091c 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -53,6 +53,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static 
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRates;
 import static 
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithUB;
 import static 
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithoutUB;
@@ -1197,6 +1198,32 @@ public class ReadWriteTableITCase extends 
ReadWriteTableTestBase {
         testSinkParallelism(23, 23);
     }
 
+    @Test
+    public void testQueryContainsDefaultFieldName() throws Exception {
+        rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+        tEnv = StreamTableEnvironment.create(buildBatchEnv(), 
EnvironmentSettings.inBatchMode());
+        String id = registerData(Collections.singletonList(changelogRow("+I", 
1, "abc")));
+        tEnv.executeSql(
+                String.format(
+                        "create table dummy_source ("
+                                + "f0 int, "
+                                + "f1 string) with ("
+                                + "'connector' = 'values', "
+                                + "'bounded' = 'true', "
+                                + "'data-id' = '%s')",
+                        id));
+        tEnv.executeSql(
+                String.format(
+                        "create table managed_table with ('file.path' = '%s') "
+                                + "like dummy_source (excluding options)",
+                        rootPath));
+        tEnv.executeSql("insert into managed_table select * from 
dummy_source").await();
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(tEnv.executeSql("select * from 
managed_table").collect());
+        assertThat(iterator.collect(1, 5, TimeUnit.SECONDS))
+                .containsOnly(changelogRow("+I", 1, "abc"));
+    }
+
     // ------------------------ Tools ----------------------------------
 
     private String collectAndCheckBatchReadWrite(

Reply via email to