[
https://issues.apache.org/jira/browse/BEAM-3983?focusedWorklogId=90531&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90531
]
ASF GitHub Bot logged work on BEAM-3983:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Apr/18 18:33
Start Date: 12/Apr/18 18:33
Worklog Time Spent: 10m
Work Description: XuMingmin closed pull request #4991: [BEAM-3983] [SQL]
Tables interface supports BigQuery
URL: https://github.com/apache/beam/pull/4991
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 5d484c90011..6598ee3e892 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -23,7 +23,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
/**
@@ -46,7 +46,7 @@
* create a {@code IO.write()} instance to write to target.
*
*/
- PTransform<? super PCollection<Row>, PDone> buildIOWriter();
+ PTransform<? super PCollection<Row>, POutput> buildIOWriter();
/**
* Get the schema info of the table.
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
index 794e3e69966..15e8b960658 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
@@ -20,7 +20,6 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
-import java.net.URI;
import java.util.List;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
@@ -109,8 +108,8 @@ public String tableName() {
return tblName.toString();
}
- public URI location() {
- return location == null ? null : URI.create(getString(location));
+ public String location() {
+ return location == null ? null : getString(location);
}
public String type() {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
index 5a24f473366..0b10b4c6b19 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
@@ -22,7 +22,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
/**
@@ -56,7 +56,7 @@ public BeamIOType getSourceType() {
}
@Override
- public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as
target");
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
index 4af82a0d6bd..a3339afc37d 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
@@ -21,7 +21,6 @@
import com.alibaba.fastjson.JSONObject;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
-import java.net.URI;
import java.util.List;
import javax.annotation.Nullable;
@@ -37,7 +36,7 @@
@Nullable
public abstract String getComment();
@Nullable
- public abstract URI getLocation();
+ public abstract String getLocation();
@Nullable
public abstract JSONObject getProperties();
@@ -45,14 +44,6 @@ public static Builder builder() {
return new
org.apache.beam.sdk.extensions.sql.meta.AutoValue_Table.Builder();
}
- public String getLocationAsString() {
- if (getLocation() == null) {
- return null;
- }
-
- return "/" + getLocation().getHost() + getLocation().getPath();
- }
-
/**
* Builder class for {@link Table}.
*/
@@ -62,7 +53,7 @@ public String getLocationAsString() {
public abstract Builder name(String name);
public abstract Builder columns(List<Column> columns);
public abstract Builder comment(String name);
- public abstract Builder location(URI location);
+ public abstract Builder location(String location);
public abstract Builder properties(JSONObject properties);
public abstract Table build();
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index 4d31e4c6406..129e15e9e58 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -19,7 +19,6 @@
import static com.google.common.base.Preconditions.checkArgument;
-import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
@@ -33,6 +32,7 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -43,7 +43,7 @@
* extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
*
*/
-public abstract class BeamKafkaTable extends BaseBeamTable implements
Serializable {
+public abstract class BeamKafkaTable extends BaseBeamTable {
private String bootstrapServers;
private List<String> topics;
private List<TopicPartition> topicPartitions;
@@ -109,11 +109,11 @@ public BeamIOType getSourceType() {
}
@Override
- public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
checkArgument(topics != null && topics.size() == 1,
"Only one topic can be acceptable as output.");
- return new PTransform<PCollection<Row>, PDone>() {
+ return new PTransform<PCollection<Row>, POutput>() {
@Override
public PDone expand(PCollection<Row> input) {
return input.apply("out_reformat",
getPTransformForOutput()).apply("persistent",
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
index e407a4d8378..d0b59001d5d 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
@@ -24,7 +24,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.csv.CSVFormat;
@@ -62,7 +62,7 @@ public BeamTextCSVTable(Schema schema, String filePattern,
}
@Override
- public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
return new BeamTextCSVTableIOWriter(schema, filePattern, csvFormat);
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
index d32c9dfec88..aea826da116 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
@@ -27,14 +27,14 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.csv.CSVFormat;
/**
* IOWriter for {@code BeamTextCSVTable}.
*/
-public class BeamTextCSVTableIOWriter extends PTransform<PCollection<Row>,
PDone>
+public class BeamTextCSVTableIOWriter extends PTransform<PCollection<Row>,
POutput>
implements Serializable {
private String filePattern;
protected Schema schema;
@@ -49,7 +49,7 @@ public BeamTextCSVTableIOWriter(Schema schema,
}
@Override
- public PDone expand(PCollection<Row> input) {
+ public POutput expand(PCollection<Row> input) {
return input.apply("encodeRecord", ParDo.of(new DoFn<Row, String>() {
@ProcessElement
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
index 841f4e2885d..0a5e9ee184c 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.text;
-import java.io.Serializable;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
import org.apache.beam.sdk.schemas.Schema;
@@ -26,7 +25,7 @@
/**
* {@code BeamTextTable} represents a text file/directory(backed by {@code
TextIO}).
*/
-public abstract class BeamTextTable extends BaseBeamTable implements
Serializable {
+public abstract class BeamTextTable extends BaseBeamTable {
protected String filePattern;
protected BeamTextTable(Schema schema, String filePattern) {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 6a612c4f98f..102069abd88 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -40,7 +40,7 @@
* )
* TYPE 'text'
* COMMENT 'this is the table orders'
- * LOCATION 'text://home/admin/orders'
+ * LOCATION '/home/admin/orders'
* TBLPROPERTIES '{"format": "Excel"}' -- format of each text line(csv format)
* }</pre>
*/
@@ -53,7 +53,7 @@
@Override public BeamSqlTable buildBeamSqlTable(Table table) {
Schema schema = getRowTypeFromTable(table);
- String filePattern = table.getLocationAsString();
+ String filePattern = table.getLocation();
CSVFormat format = CSVFormat.DEFAULT;
JSONObject properties = table.getProperties();
String csvFormatStr = properties.getString("format");
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index 9bf724d797f..93b29d9beef 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -44,7 +44,7 @@ public void testExecute_createTextTable() throws Exception {
+ "name varchar(31) COMMENT 'name', \n"
+ "age int COMMENT 'age') \n"
+ "TYPE 'text' \n"
- + "COMMENT '' LOCATION 'text://home/admin/orders'"
+ + "COMMENT '' LOCATION '/home/admin/orders'"
);
Table table = metaStore.getTable("person");
assertNotNull(table);
@@ -63,7 +63,7 @@ public void testExecute_dropTable() throws Exception {
+ "name varchar(31) COMMENT 'name', \n"
+ "age int COMMENT 'age') \n"
+ "TYPE 'text' \n"
- + "COMMENT '' LOCATION 'text://home/admin/orders'"
+ + "COMMENT '' LOCATION '/home/admin/orders'"
);
Table table = metaStore.getTable("person");
assertNotNull(table);
@@ -86,7 +86,7 @@ public void
testExecute_dropTable_assertTableRemovedFromPlanner() throws Excepti
+ "name varchar(31) COMMENT 'name', \n"
+ "age int COMMENT 'age') \n"
+ "TYPE 'text' \n"
- + "COMMENT '' LOCATION 'text://home/admin/orders'"
+ + "COMMENT '' LOCATION '/home/admin/orders'"
);
cli.execute("drop table person");
cli.explainQuery("select * from person");
@@ -107,7 +107,7 @@ public void testExplainQuery() throws Exception {
+ "name varchar(31) COMMENT 'name', \n"
+ "age int COMMENT 'age') \n"
+ "TYPE 'text' \n"
- + "COMMENT '' LOCATION 'text://home/admin/orders'"
+ + "COMMENT '' LOCATION '/home/admin/orders'"
);
String plan = cli.explainQuery("select * from person");
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
index 14fb484c552..6d5ac1ab778 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
@@ -24,7 +24,6 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
-import java.net.URI;
import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
import org.apache.beam.sdk.extensions.sql.meta.Column;
import org.apache.beam.sdk.extensions.sql.meta.Table;
@@ -50,7 +49,7 @@ public void testParseCreateTable_full() throws Exception {
+ "name varchar(31) COMMENT 'name') \n"
+ "TYPE 'text' \n"
+ "COMMENT 'person table' \n"
- + "LOCATION 'text://home/admin/person'\n"
+ + "LOCATION '/home/admin/person'\n"
+ "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
);
assertEquals(
@@ -66,7 +65,7 @@ public void testParseCreateTable_withoutType() throws
Exception {
+ "id int COMMENT 'id', \n"
+ "name varchar(31) COMMENT 'name') \n"
+ "COMMENT 'person table' \n"
- + "LOCATION 'text://home/admin/person'\n"
+ + "LOCATION '/home/admin/person'\n"
+ "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
);
}
@@ -84,7 +83,7 @@ public void testParseCreateTable_withoutTableComment() throws
Exception {
+ "id int COMMENT 'id', \n"
+ "name varchar(31) COMMENT 'name') \n"
+ "TYPE 'text' \n"
- + "LOCATION 'text://home/admin/person'\n"
+ + "LOCATION '/home/admin/person'\n"
+ "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
);
assertEquals(mockTable("person", "text", null, properties), table);
@@ -98,7 +97,7 @@ public void testParseCreateTable_withoutTblProperties()
throws Exception {
+ "name varchar(31) COMMENT 'name') \n"
+ "TYPE 'text' \n"
+ "COMMENT 'person table' \n"
- + "LOCATION 'text://home/admin/person'\n"
+ + "LOCATION '/home/admin/person'\n"
);
assertEquals(
mockTable("person", "text", "person table", new JSONObject()),
@@ -145,21 +144,17 @@ private Table parseTable(String sql) throws Exception {
}
private static Table mockTable(String name, String type, String comment,
JSONObject properties) {
- return mockTable(name, type, comment, properties, "text://home/admin/" +
name);
+ return mockTable(name, type, comment, properties, "/home/admin/" + name);
}
private static Table mockTable(String name, String type, String comment,
JSONObject properties,
String location) {
- URI locationURI = null;
- if (location != null) {
- locationURI = URI.create(location);
- }
return Table.builder()
.name(name)
.type(type)
.comment(comment)
- .location(locationURI)
+ .location(location)
.columns(ImmutableList.of(
Column.builder()
.name("id")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index f4253dd933c..47d3b06fb63 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -36,7 +36,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@@ -123,7 +123,7 @@ public BeamIOType getSourceType() {
}
@Override
- public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
throw new UnsupportedOperationException();
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
index ffe8dcc8f00..58b048e39f0 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
@@ -24,7 +24,6 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
-import java.net.URI;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
import org.apache.beam.sdk.extensions.sql.meta.Column;
@@ -65,7 +64,7 @@ private static Table mockTable(String name) {
return Table.builder()
.name(name)
.comment(name + " table")
- .location(URI.create("kafka://localhost:2181/brokers?topic=test"))
+ .location("kafka://localhost:2181/brokers?topic=test")
.columns(ImmutableList.of(
Column.builder()
.name("id")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
index 11ecaf43082..7b47e1ab3f1 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
@@ -23,7 +23,6 @@
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
-import java.net.URI;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
import org.apache.beam.sdk.extensions.sql.meta.Column;
@@ -76,7 +75,7 @@ private static Table mockTable(String name, String format) {
return Table.builder()
.name(name)
.comment(name + " table")
- .location(URI.create("text://home/admin/" + name))
+ .location("/home/admin/" + name)
.columns(ImmutableList.of(
Column.builder()
.name("id")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
index a3e391e68d2..2ee027a0294 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
@@ -25,7 +25,6 @@
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
-import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
@@ -129,7 +128,7 @@ private static Table mockTable(String name, String type) {
return Table.builder()
.name(name)
.comment(name + " table")
- .location(URI.create("text://home/admin/" + name))
+ .location("/home/admin/" + name)
.columns(ImmutableList.of(
Column.builder()
.name("id")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
index 249daee1cb4..24d22e1401d 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
@@ -34,6 +34,7 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
/**
@@ -104,7 +105,7 @@ public BeamIOType getSourceType() {
"MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(),
Create.of(rows));
}
- @Override public PTransform<? super PCollection<Row>, PDone> buildIOWriter()
{
+ @Override public PTransform<? super PCollection<Row>, POutput>
buildIOWriter() {
return new OutputStore();
}
@@ -112,7 +113,7 @@ public BeamIOType getSourceType() {
* Keep output in {@code CONTENT} for validation.
*
*/
- public static class OutputStore extends PTransform<PCollection<Row>, PDone> {
+ public static class OutputStore extends PTransform<PCollection<Row>,
POutput> {
@Override
public PDone expand(PCollection<Row> input) {
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
index 0ed77babd5d..cd52de04465 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
@@ -23,7 +23,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
/**
@@ -36,7 +36,7 @@ public MockedTable(Schema beamSchema) {
}
@Override
- public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
throw new UnsupportedOperationException("buildIOWriter unsupported!");
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 90531)
Time Spent: 3h 10m (was: 3h)
> BigQuery writes from pure SQL
> -----------------------------
>
> Key: BEAM-3983
> URL: https://issues.apache.org/jira/browse/BEAM-3983
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql
> Reporter: Andrew Pilloud
> Assignee: Andrew Pilloud
> Priority: Major
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> It would be nice if you could write to BigQuery in SQL without writing any
> java code. For example:
> {code:java}
> INSERT INTO bigquery SELECT * FROM PCOLLECTION{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)