This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 0aaf3d2 [hotfix] Value field should be named in value count mode
0aaf3d2 is described below
commit 0aaf3d26275cc70a40c916cd7c8815313f1bdd00
Author: Jane Chan <[email protected]>
AuthorDate: Fri Apr 8 15:25:05 2022 +0800
[hotfix] Value field should be named in value count mode
This closes #79
---
.../flink/table/store/connector/TableStore.java | 5 +++-
.../store/connector/ReadWriteTableITCase.java | 27 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 88826f0..37315c4 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -171,7 +171,10 @@ public class TableStore {
MergeFunction mergeFunction;
if (primaryKeys.length == 0) {
keyType = type;
- valueType = RowType.of(new BigIntType(false));
+ valueType =
+ RowType.of(
+ new LogicalType[] {new BigIntType(false)},
+ new String[] {"_VALUE_COUNT"});
mergeFunction = new ValueCountMergeFunction();
} else {
List<RowType.RowField> fields = TypeUtils.project(type,
primaryKeys).getFields();
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 4596dec..bff091c 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -53,6 +53,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
import static
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRates;
import static
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithUB;
import static
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithoutUB;
@@ -1197,6 +1198,32 @@ public class ReadWriteTableITCase extends
ReadWriteTableTestBase {
testSinkParallelism(23, 23);
}
+ @Test
+ public void testQueryContainsDefaultFieldName() throws Exception {
+ rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+ tEnv = StreamTableEnvironment.create(buildBatchEnv(),
EnvironmentSettings.inBatchMode());
+ String id = registerData(Collections.singletonList(changelogRow("+I",
1, "abc")));
+ tEnv.executeSql(
+ String.format(
+ "create table dummy_source ("
+ + "f0 int, "
+ + "f1 string) with ("
+ + "'connector' = 'values', "
+ + "'bounded' = 'true', "
+ + "'data-id' = '%s')",
+ id));
+ tEnv.executeSql(
+ String.format(
+ "create table managed_table with ('file.path' = '%s') "
+ + "like dummy_source (excluding options)",
+ rootPath));
+ tEnv.executeSql("insert into managed_table select * from
dummy_source").await();
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(tEnv.executeSql("select * from
managed_table").collect());
+ assertThat(iterator.collect(1, 5, TimeUnit.SECONDS))
+ .containsOnly(changelogRow("+I", 1, "abc"));
+ }
+
// ------------------------ Tools ----------------------------------
private String collectAndCheckBatchReadWrite(