This is an automated email from the ASF dual-hosted git repository.
mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e981b43 [BEAM-3983] [SQL] Tables interface supports BigQuery (#4991)
e981b43 is described below
commit e981b439b6924dba20e7285091d343ae4d41765a
Author: Andrew Pilloud <[email protected]>
AuthorDate: Thu Apr 12 11:33:00 2018 -0700
[BEAM-3983] [SQL] Tables interface supports BigQuery (#4991)
* [SQL] POutput interface instead of PDone type
* [SQL] Location isn't necessarily a URI
---
.../apache/beam/sdk/extensions/sql/BeamSqlTable.java | 4 ++--
.../sdk/extensions/sql/impl/parser/SqlCreateTable.java | 5 ++---
.../sql/impl/schema/BeamPCollectionTable.java | 4 ++--
.../org/apache/beam/sdk/extensions/sql/meta/Table.java | 13 ++-----------
.../sql/meta/provider/kafka/BeamKafkaTable.java | 8 ++++----
.../sql/meta/provider/text/BeamTextCSVTable.java | 4 ++--
.../meta/provider/text/BeamTextCSVTableIOWriter.java | 6 +++---
.../sql/meta/provider/text/BeamTextTable.java | 3 +--
.../sql/meta/provider/text/TextTableProvider.java | 4 ++--
.../apache/beam/sdk/extensions/sql/BeamSqlCliTest.java | 8 ++++----
.../extensions/sql/impl/parser/BeamSqlParserTest.java | 17 ++++++-----------
.../sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java | 4 ++--
.../sql/meta/provider/kafka/KafkaTableProviderTest.java | 3 +--
.../sql/meta/provider/text/TextTableProviderTest.java | 3 +--
.../sql/meta/store/InMemoryMetaStoreTest.java | 3 +--
.../sdk/extensions/sql/mock/MockedBoundedTable.java | 5 +++--
.../beam/sdk/extensions/sql/mock/MockedTable.java | 4 ++--
17 files changed, 40 insertions(+), 58 deletions(-)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 5d484c9..6598ee3 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -23,7 +23,7 @@ import
org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
/**
@@ -46,7 +46,7 @@ public interface BeamSqlTable {
* create a {@code IO.write()} instance to write to target.
*
*/
- PTransform<? super PCollection<Row>, PDone> buildIOWriter();
+ PTransform<? super PCollection<Row>, POutput> buildIOWriter();
/**
* Get the schema info of the table.
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
index 794e3e6..15e8b96 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.extensions.sql.impl.parser;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
-import java.net.URI;
import java.util.List;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
@@ -109,8 +108,8 @@ public class SqlCreateTable extends SqlCall {
return tblName.toString();
}
- public URI location() {
- return location == null ? null : URI.create(getString(location));
+ public String location() {
+ return location == null ? null : getString(location);
}
public String type() {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
index 5a24f47..0b10b4c 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
/**
@@ -56,7 +56,7 @@ public class BeamPCollectionTable extends BaseBeamTable {
}
@Override
- public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as
target");
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
index 4af82a0..a3339af 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
@@ -21,7 +21,6 @@ package org.apache.beam.sdk.extensions.sql.meta;
import com.alibaba.fastjson.JSONObject;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
-import java.net.URI;
import java.util.List;
import javax.annotation.Nullable;
@@ -37,7 +36,7 @@ public abstract class Table implements Serializable {
@Nullable
public abstract String getComment();
@Nullable
- public abstract URI getLocation();
+ public abstract String getLocation();
@Nullable
public abstract JSONObject getProperties();
@@ -45,14 +44,6 @@ public abstract class Table implements Serializable {
return new
org.apache.beam.sdk.extensions.sql.meta.AutoValue_Table.Builder();
}
- public String getLocationAsString() {
- if (getLocation() == null) {
- return null;
- }
-
- return "/" + getLocation().getHost() + getLocation().getPath();
- }
-
/**
* Builder class for {@link Table}.
*/
@@ -62,7 +53,7 @@ public abstract class Table implements Serializable {
public abstract Builder name(String name);
public abstract Builder columns(List<Column> columns);
public abstract Builder comment(String name);
- public abstract Builder location(URI location);
+ public abstract Builder location(String location);
public abstract Builder properties(JSONObject properties);
public abstract Table build();
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index 4d31e4c..129e15e 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -19,7 +19,6 @@ package
org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
import static com.google.common.base.Preconditions.checkArgument;
-import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
@@ -33,6 +32,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -43,7 +43,7 @@ import
org.apache.kafka.common.serialization.ByteArraySerializer;
* extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
*
*/
-public abstract class BeamKafkaTable extends BaseBeamTable implements
Serializable {
+public abstract class BeamKafkaTable extends BaseBeamTable {
private String bootstrapServers;
private List<String> topics;
private List<TopicPartition> topicPartitions;
@@ -109,11 +109,11 @@ public abstract class BeamKafkaTable extends
BaseBeamTable implements Serializab
}
@Override
- public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
checkArgument(topics != null && topics.size() == 1,
"Only one topic can be acceptable as output.");
- return new PTransform<PCollection<Row>, PDone>() {
+ return new PTransform<PCollection<Row>, POutput>() {
@Override
public PDone expand(PCollection<Row> input) {
return input.apply("out_reformat",
getPTransformForOutput()).apply("persistent",
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
index e407a4d..d0b5900 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.csv.CSVFormat;
@@ -62,7 +62,7 @@ public class BeamTextCSVTable extends BeamTextTable {
}
@Override
- public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
return new BeamTextCSVTableIOWriter(schema, filePattern, csvFormat);
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
index d32c9dfe..aea826d 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
@@ -27,14 +27,14 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.csv.CSVFormat;
/**
* IOWriter for {@code BeamTextCSVTable}.
*/
-public class BeamTextCSVTableIOWriter extends PTransform<PCollection<Row>,
PDone>
+public class BeamTextCSVTableIOWriter extends PTransform<PCollection<Row>,
POutput>
implements Serializable {
private String filePattern;
protected Schema schema;
@@ -49,7 +49,7 @@ public class BeamTextCSVTableIOWriter extends
PTransform<PCollection<Row>, PDone
}
@Override
- public PDone expand(PCollection<Row> input) {
+ public POutput expand(PCollection<Row> input) {
return input.apply("encodeRecord", ParDo.of(new DoFn<Row, String>() {
@ProcessElement
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
index 841f4e2..0a5e9ee 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.text;
-import java.io.Serializable;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
import org.apache.beam.sdk.schemas.Schema;
@@ -26,7 +25,7 @@ import org.apache.beam.sdk.schemas.Schema;
/**
* {@code BeamTextTable} represents a text file/directory(backed by {@code
TextIO}).
*/
-public abstract class BeamTextTable extends BaseBeamTable implements
Serializable {
+public abstract class BeamTextTable extends BaseBeamTable {
protected String filePattern;
protected BeamTextTable(Schema schema, String filePattern) {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 6a612c4..102069a 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -40,7 +40,7 @@ import org.apache.commons.csv.CSVFormat;
* )
* TYPE 'text'
* COMMENT 'this is the table orders'
- * LOCATION 'text://home/admin/orders'
+ * LOCATION '/home/admin/orders'
* TBLPROPERTIES '{"format": "Excel"}' -- format of each text line(csv format)
* }</pre>
*/
@@ -53,7 +53,7 @@ public class TextTableProvider implements TableProvider {
@Override public BeamSqlTable buildBeamSqlTable(Table table) {
Schema schema = getRowTypeFromTable(table);
- String filePattern = table.getLocationAsString();
+ String filePattern = table.getLocation();
CSVFormat format = CSVFormat.DEFAULT;
JSONObject properties = table.getProperties();
String csvFormatStr = properties.getString("format");
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index 9bf724d..93b29d9 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -44,7 +44,7 @@ public class BeamSqlCliTest {
+ "name varchar(31) COMMENT 'name', \n"
+ "age int COMMENT 'age') \n"
+ "TYPE 'text' \n"
- + "COMMENT '' LOCATION 'text://home/admin/orders'"
+ + "COMMENT '' LOCATION '/home/admin/orders'"
);
Table table = metaStore.getTable("person");
assertNotNull(table);
@@ -63,7 +63,7 @@ public class BeamSqlCliTest {
+ "name varchar(31) COMMENT 'name', \n"
+ "age int COMMENT 'age') \n"
+ "TYPE 'text' \n"
- + "COMMENT '' LOCATION 'text://home/admin/orders'"
+ + "COMMENT '' LOCATION '/home/admin/orders'"
);
Table table = metaStore.getTable("person");
assertNotNull(table);
@@ -86,7 +86,7 @@ public class BeamSqlCliTest {
+ "name varchar(31) COMMENT 'name', \n"
+ "age int COMMENT 'age') \n"
+ "TYPE 'text' \n"
- + "COMMENT '' LOCATION 'text://home/admin/orders'"
+ + "COMMENT '' LOCATION '/home/admin/orders'"
);
cli.execute("drop table person");
cli.explainQuery("select * from person");
@@ -107,7 +107,7 @@ public class BeamSqlCliTest {
+ "name varchar(31) COMMENT 'name', \n"
+ "age int COMMENT 'age') \n"
+ "TYPE 'text' \n"
- + "COMMENT '' LOCATION 'text://home/admin/orders'"
+ + "COMMENT '' LOCATION '/home/admin/orders'"
);
String plan = cli.explainQuery("select * from person");
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
index 14fb484..6d5ac1a 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
-import java.net.URI;
import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
import org.apache.beam.sdk.extensions.sql.meta.Column;
import org.apache.beam.sdk.extensions.sql.meta.Table;
@@ -50,7 +49,7 @@ public class BeamSqlParserTest {
+ "name varchar(31) COMMENT 'name') \n"
+ "TYPE 'text' \n"
+ "COMMENT 'person table' \n"
- + "LOCATION 'text://home/admin/person'\n"
+ + "LOCATION '/home/admin/person'\n"
+ "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
);
assertEquals(
@@ -66,7 +65,7 @@ public class BeamSqlParserTest {
+ "id int COMMENT 'id', \n"
+ "name varchar(31) COMMENT 'name') \n"
+ "COMMENT 'person table' \n"
- + "LOCATION 'text://home/admin/person'\n"
+ + "LOCATION '/home/admin/person'\n"
+ "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
);
}
@@ -84,7 +83,7 @@ public class BeamSqlParserTest {
+ "id int COMMENT 'id', \n"
+ "name varchar(31) COMMENT 'name') \n"
+ "TYPE 'text' \n"
- + "LOCATION 'text://home/admin/person'\n"
+ + "LOCATION '/home/admin/person'\n"
+ "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
);
assertEquals(mockTable("person", "text", null, properties), table);
@@ -98,7 +97,7 @@ public class BeamSqlParserTest {
+ "name varchar(31) COMMENT 'name') \n"
+ "TYPE 'text' \n"
+ "COMMENT 'person table' \n"
- + "LOCATION 'text://home/admin/person'\n"
+ + "LOCATION '/home/admin/person'\n"
);
assertEquals(
mockTable("person", "text", "person table", new JSONObject()),
@@ -145,21 +144,17 @@ public class BeamSqlParserTest {
}
private static Table mockTable(String name, String type, String comment,
JSONObject properties) {
- return mockTable(name, type, comment, properties, "text://home/admin/" +
name);
+ return mockTable(name, type, comment, properties, "/home/admin/" + name);
}
private static Table mockTable(String name, String type, String comment,
JSONObject properties,
String location) {
- URI locationURI = null;
- if (location != null) {
- locationURI = URI.create(location);
- }
return Table.builder()
.name(name)
.type(type)
.comment(comment)
- .location(locationURI)
+ .location(location)
.columns(ImmutableList.of(
Column.builder()
.name("id")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index f4253dd..47d3b06 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@@ -123,7 +123,7 @@ public class BeamJoinRelUnboundedVsBoundedTest extends
BaseRelTest {
}
@Override
- public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
throw new UnsupportedOperationException();
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
index ffe8dcc..58b048e 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
-import java.net.URI;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
import org.apache.beam.sdk.extensions.sql.meta.Column;
@@ -65,7 +64,7 @@ public class KafkaTableProviderTest {
return Table.builder()
.name(name)
.comment(name + " table")
- .location(URI.create("kafka://localhost:2181/brokers?topic=test"))
+ .location("kafka://localhost:2181/brokers?topic=test")
.columns(ImmutableList.of(
Column.builder()
.name("id")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
index 11ecaf4..7b47e1a 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
-import java.net.URI;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
import org.apache.beam.sdk.extensions.sql.meta.Column;
@@ -76,7 +75,7 @@ public class TextTableProviderTest {
return Table.builder()
.name(name)
.comment(name + " table")
- .location(URI.create("text://home/admin/" + name))
+ .location("/home/admin/" + name)
.columns(ImmutableList.of(
Column.builder()
.name("id")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
index a3e391e..2ee027a 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertThat;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
-import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
@@ -129,7 +128,7 @@ public class InMemoryMetaStoreTest {
return Table.builder()
.name(name)
.comment(name + " table")
- .location(URI.create("text://home/admin/" + name))
+ .location("/home/admin/" + name)
.columns(ImmutableList.of(
Column.builder()
.name("id")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
index 249daee..24d22e1 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
/**
@@ -104,7 +105,7 @@ public class MockedBoundedTable extends MockedTable {
"MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(),
Create.of(rows));
}
- @Override public PTransform<? super PCollection<Row>, PDone> buildIOWriter()
{
+ @Override public PTransform<? super PCollection<Row>, POutput>
buildIOWriter() {
return new OutputStore();
}
@@ -112,7 +113,7 @@ public class MockedBoundedTable extends MockedTable {
* Keep output in {@code CONTENT} for validation.
*
*/
- public static class OutputStore extends PTransform<PCollection<Row>, PDone> {
+ public static class OutputStore extends PTransform<PCollection<Row>,
POutput> {
@Override
public PDone expand(PCollection<Row> input) {
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
index 0ed77ba..cd52de0 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
@@ -23,7 +23,7 @@ import
org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
/**
@@ -36,7 +36,7 @@ public abstract class MockedTable extends BaseBeamTable {
}
@Override
- public PTransform<? super PCollection<Row>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
throw new UnsupportedOperationException("buildIOWriter unsupported!");
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].