Repository: beam Updated Branches: refs/heads/master a6f69bd12 -> e686286f1
Reading spanner schema transform Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fd027b2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fd027b2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fd027b2 Branch: refs/heads/master Commit: 1fd027b206977fbe5ff6011e3b006836087f5d08 Parents: a6f69bd Author: Mairbek Khadikov <[email protected]> Authored: Wed Oct 18 15:18:55 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Oct 27 14:44:30 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/spanner/ReadSpannerSchema.java | 94 ++++++++++++ .../beam/sdk/io/gcp/spanner/SpannerSchema.java | 144 +++++++++++++++++++ .../io/gcp/spanner/ReadSpannerSchemaTest.java | 134 +++++++++++++++++ .../sdk/io/gcp/spanner/SpannerSchemaTest.java | 61 ++++++++ 4 files changed, 433 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java new file mode 100644 index 0000000..e2ade68 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.ReadOnlyTransaction; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Statement; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * This {@link DoFn} reads Cloud Spanner 'information_schema.*' tables to build the + * {@link SpannerSchema}. + */ +class ReadSpannerSchema extends DoFn<Void, SpannerSchema> { + + private final SpannerConfig config; + + private transient SpannerAccessor spannerAccessor; + + public ReadSpannerSchema(SpannerConfig config) { + this.config = config; + } + + @Setup + public void setup() throws Exception { + spannerAccessor = config.connectToSpanner(); + } + + @Teardown + public void teardown() throws Exception { + spannerAccessor.close(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + DatabaseClient databaseClient = spannerAccessor.getDatabaseClient(); + try (ReadOnlyTransaction tx = + databaseClient.readOnlyTransaction()) { + ResultSet resultSet = readTableInfo(tx); + + while (resultSet.next()) { + String tableName = resultSet.getString(0); + String columnName = resultSet.getString(1); + String type = resultSet.getString(2); + + builder.addColumn(tableName, columnName, type); + } + + resultSet = readPrimaryKeyInfo(tx); + while (resultSet.next()) { + String tableName = resultSet.getString(0); + String columnName = resultSet.getString(1); + String ordering = resultSet.getString(2); + + builder.addKeyPart(tableName, columnName, ordering.toUpperCase().equals("DESC")); + } + } + c.output(builder.build()); + } + + private ResultSet readTableInfo(ReadOnlyTransaction tx) { + return tx.executeQuery(Statement.of( + "SELECT c.table_name, c.column_name, c.spanner_type" + + " FROM information_schema.columns as c" + + " WHERE where c.table_catalog = '' AND c.table_schema = ''" + + " ORDER BY c.table_name, c.ordinal_position")); + } + + private ResultSet readPrimaryKeyInfo(ReadOnlyTransaction tx) { + return tx.executeQuery(Statement + .of("SELECT t.table_name, t.column_name, t.column_ordering" + + " FROM information_schema.index_columns AS t " + + " WHERE t.index_name = 'PRIMARY_KEY' AND t.table_catalog = ''" + + " AND t.table_schema = ''" + + " ORDER BY t.table_name, t.ordinal_position")); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java new file mode 100644 index 0000000..4c12b8d --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.auto.value.AutoValue; +import com.google.cloud.spanner.Type; +import com.google.common.collect.ArrayListMultimap; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * Encapsulates Cloud Spanner Schema. + */ +class SpannerSchema implements Serializable { + private final List<String> tables; + private final ArrayListMultimap<String, Column> columns; + private final ArrayListMultimap<String, KeyPart> keyParts; + + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link SpannerSchema}. + */ + static class Builder { + private final ArrayListMultimap<String, Column> columns = ArrayListMultimap.create(); + private final ArrayListMultimap<String, KeyPart> keyParts = ArrayListMultimap.create(); + + public Builder addColumn(String table, String name, String type) { + addColumn(table, Column.create(name.toLowerCase(), type)); + return this; + } + + private Builder addColumn(String table, Column column) { + columns.put(table.toLowerCase(), column); + return this; + } + + public Builder addKeyPart(String table, String column, boolean desc) { + keyParts.put(table, KeyPart.create(column.toLowerCase(), desc)); + return this; + } + + public SpannerSchema build() { + return new SpannerSchema(columns, keyParts); + } + } + + private SpannerSchema(ArrayListMultimap<String, Column> columns, + ArrayListMultimap<String, KeyPart> keyParts) { + this.columns = columns; + this.keyParts = keyParts; + tables = new ArrayList<>(columns.keySet()); + } + + public List<String> getTables() { + return tables; + } + + public List<Column> getColumns(String table) { + return columns.get(table); + } + + public List<KeyPart> getKeyParts(String table) { + return keyParts.get(table); + } + + @AutoValue + abstract static class KeyPart implements Serializable { + static KeyPart create(String field, boolean desc) { + return new AutoValue_SpannerSchema_KeyPart(field, desc); + } + + abstract String getField(); + + abstract boolean isDesc(); + } + + @AutoValue + abstract static class Column implements Serializable { + + static Column create(String name, Type type) { + return new AutoValue_SpannerSchema_Column(name, type); + } + + static Column create(String name, String spannerType) { + return create(name, parseSpannerType(spannerType)); + } + + public abstract String getName(); + + public abstract Type getType(); + + private static Type parseSpannerType(String spannerType) { + spannerType = spannerType.toUpperCase(); + if (spannerType.equals("BOOL")) { + return Type.bool(); + } + if (spannerType.equals("INT64")) { + return Type.int64(); + } + if (spannerType.equals("FLOAT64")) { + return Type.float64(); + } + if (spannerType.startsWith("STRING")) { + return Type.string(); + } + if (spannerType.startsWith("BYTES")) { + return Type.bytes(); + } + if (spannerType.equals("TIMESTAMP")) { + return Type.timestamp(); + } + if (spannerType.equals("DATE")) { + return Type.date(); + } + + if (spannerType.startsWith("ARRAY")) { + // Substring "ARRAY<xxx>" + String spannerArrayType = spannerType.substring(6, spannerType.length() - 1); + Type itemType = parseSpannerType(spannerArrayType); + return Type.array(itemType); + } + throw new IllegalArgumentException("Unknown spanner type " + spannerType); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java new file mode 100644 index 0000000..25dc6dc --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.cloud.spanner.ReadOnlyTransaction; +import com.google.cloud.spanner.ResultSets; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.Type; +import com.google.cloud.spanner.Value; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.ArgumentMatcher; + +/** + * A test of {@link ReadSpannerSchemaTest}. + */ +public class ReadSpannerSchemaTest { + + @Rule + public final transient ExpectedException thrown = ExpectedException.none(); + + private FakeServiceFactory serviceFactory; + private ReadOnlyTransaction mockTx; + + + private static Struct columnMetadata(String tableName, String columnName, String type) { + return Struct.newBuilder().add("table_name", Value.string(tableName)) + .add("column_name", Value.string(columnName)).add("spanner_type", Value.string(type)) + .build(); + } + + private static Struct pkMetadata(String tableName, String columnName, String ordering) { + return Struct.newBuilder().add("table_name", Value.string(tableName)) + .add("column_name", Value.string(columnName)).add("column_ordering", Value.string(ordering)) + .build(); + } + + private void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct> rows) { + Type type = Type.struct(Type.StructField.of("table_name", Type.string()), + Type.StructField.of("column_name", Type.string()), + Type.StructField.of("spanner_type", Type.string())); + when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() { + + @Override public boolean matches(Object argument) { + if (!(argument instanceof Statement)) { + return false; + } + Statement st = (Statement) argument; + return st.getSql().contains("information_schema.columns"); + } + }))).thenReturn(ResultSets.forRows(type, rows)); + } + + private void preparePkMetadata(ReadOnlyTransaction tx, List<Struct> rows) { + Type type = Type.struct(Type.StructField.of("table_name", Type.string()), + Type.StructField.of("column_name", Type.string()), + Type.StructField.of("column_ordering", Type.string())); + when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() { + + @Override public boolean matches(Object argument) { + if (!(argument instanceof Statement)) { + return false; + } + Statement st = (Statement) argument; + return st.getSql().contains("information_schema.index_columns"); + } + }))).thenReturn(ResultSets.forRows(type, rows)); + } + + @Before + @SuppressWarnings("unchecked") + public void setUp() throws Exception { + serviceFactory = new FakeServiceFactory(); + mockTx = mock(ReadOnlyTransaction.class); + } + + @Test + public void simple() throws Exception { + // Simplest schema: a table with int64 key + ReadOnlyTransaction tx = mock(ReadOnlyTransaction.class); + when(serviceFactory.mockDatabaseClient().readOnlyTransaction()).thenReturn(tx); + + preparePkMetadata(tx, Arrays.asList(pkMetadata("test", "key", "ASC"))); + prepareColumnMetadata(tx, Arrays.asList(columnMetadata("test", "key", "INT64"))); + + SpannerConfig config = SpannerConfig.create().withProjectId("test-project") + .withInstanceId("test-instance").withDatabaseId("test-database") + .withServiceFactory(serviceFactory); + + DoFnTester<Void, SpannerSchema> tester = DoFnTester.of(new ReadSpannerSchema(config)); + List<SpannerSchema> schemas = tester.processBundle(Arrays.asList((Void) null)); + + assertEquals(1, schemas.size()); + + SpannerSchema schema = schemas.get(0); + + assertEquals(1, schema.getTables().size()); + + SpannerSchema.Column column = SpannerSchema.Column.create("key", Type.int64()); + SpannerSchema.KeyPart keyPart = SpannerSchema.KeyPart.create("key", false); + + assertThat(schema.getColumns("test"), contains(column)); + assertThat(schema.getKeyParts("test"), contains(keyPart)); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java new file mode 100644 index 0000000..fcb23dc --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * A test of {@link SpannerSchema}. + */ +public class SpannerSchemaTest { + + @Test + public void testSingleTable() throws Exception { + SpannerSchema schema = SpannerSchema.builder() + .addColumn("test", "pk", "STRING(48)") + .addKeyPart("test", "pk", false) + .addColumn("test", "maxKey", "STRING(MAX)").build(); + + assertEquals(1, schema.getTables().size()); + assertEquals(2, schema.getColumns("test").size()); + assertEquals(1, schema.getKeyParts("test").size()); + } + + @Test + public void testTwoTables() throws Exception { + SpannerSchema schema = SpannerSchema.builder() + .addColumn("test", "pk", "STRING(48)") + .addKeyPart("test", "pk", false) + .addColumn("test", "maxKey", "STRING(MAX)") + + .addColumn("other", "pk", "INT64") + .addKeyPart("other", "pk", true) + .addColumn("other", "maxKey", "STRING(MAX)") + + .build(); + + assertEquals(2, schema.getTables().size()); + assertEquals(2, schema.getColumns("test").size()); + assertEquals(1, schema.getKeyParts("test").size()); + + assertEquals(2, schema.getColumns("other").size()); + assertEquals(1, schema.getKeyParts("other").size()); + } +}
