http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/pom.xml b/tajo-storage/tajo-storage-jdbc/pom.xml new file mode 100644 index 0000000..35000a4 --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/pom.xml @@ -0,0 +1,243 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tajo-project</artifactId> + <groupId>org.apache.tajo</groupId> + <version>0.12.0-SNAPSHOT</version> + <relativePath>../../tajo-project</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tajo-storage-jdbc</artifactId> + <packaging>jar</packaging> + <name>Tajo JDBC storage common</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + + <repositories> + <repository> + <id>repository.jboss.org</id> + <url>https://repository.jboss.org/nexus/content/repositories/releases/ + </url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/*.sql</exclude> + </excludes> + </configuration> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <tajo.test>TRUE</tajo.test> + </systemProperties> + <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>create-protobuf-generated-sources-directory</id> + <phase>initialize</phase> + <configuration> + <target> + <mkdir dir="target/generated-sources/proto" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2</version> + <executions> + <execution> + <id>generate-sources</id> + <phase>generate-sources</phase> + <configuration> + <executable>protoc</executable> + <arguments> + <argument>-Isrc/main/proto/</argument> + <argument>--proto_path=../../tajo-common/src/main/proto</argument> + <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument> + <argument>--java_out=target/generated-sources/proto</argument> + <argument>src/main/proto/JdbcFragmentProtos.proto</argument> + </arguments> + </configuration> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/proto</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + </plugin> + </plugins> + </build> + + + <dependencies> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-catalog-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-plan</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + + </dependencies> + + <profiles> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </reporting> +</project>
http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java new file mode 100644 index 0000000..2910ea5 --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import org.apache.tajo.exception.TajoInternalError; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +public class ConnectionInfo { + String scheme; + String host; + String dbName; + String tableName; + String user; + String password; + Map<String, String> params; + + public String scheme() { + return scheme; + } + + public String database() { + return dbName; + } + + public String table() { + return tableName; + } + + public String user() { + return user; + } + + public String password() { + return password; + } + + public static ConnectionInfo fromURI(String originalUri) { + return fromURI(URI.create(originalUri)); + } + + public static ConnectionInfo fromURI(URI originalUri) { + final String uriStr = originalUri.toASCIIString(); + URI uri = originalUri; + + final ConnectionInfo connInfo = new ConnectionInfo(); + connInfo.scheme = uriStr.substring(0, uriStr.indexOf("://")); + + if (connInfo.scheme.split(":").length > 1) { + int idx = uriStr.indexOf(':'); + uri = URI.create(uriStr.substring(idx + 1)); + } + + connInfo.host = uri.getHost(); + + String path = uri.getPath(); + if (path != null && !path.isEmpty()) { + String [] pathElements = path.substring(1).split("/"); + if (pathElements.length != 1) { + throw new TajoInternalError("Invalid JDBC path: " + path); + } + connInfo.dbName = pathElements[0]; + } + + Map<String, String> params = new HashMap<>(); + + int paramIndex = uriStr.indexOf("?"); + if (paramIndex > 0) { + String parameterPart = uriStr.substring(paramIndex+1, uriStr.length()); + + String [] eachParam = parameterPart.split("&"); + + for (String each: eachParam) { + String [] keyValues = each.split("="); + if (keyValues.length != 2) { + throw new TajoInternalError("Invalid URI Parameters: " + parameterPart); + } + params.put(keyValues[0], keyValues[1]); + } + } + + if (params.containsKey("table")) { + connInfo.tableName = params.remove("table"); + } + + if (params.containsKey("user")) { + connInfo.user = params.remove("user"); + } + if (params.containsKey("password")) { + connInfo.password = params.remove("password"); + } + + connInfo.params = params; + + return connInfo; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java new file mode 100644 index 0000000..bf9536e --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.jdbc.JdbcFragmentProtos.JdbcFragmentProto; +import org.apache.tajo.util.TUtil; + +public class JdbcFragment implements Fragment, Comparable<JdbcFragment>, Cloneable { + String uri; + String inputSourceId; + String [] hostNames; + + + public JdbcFragment(ByteString raw) throws InvalidProtocolBufferException { + JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder(); + builder.mergeFrom(raw); + builder.build(); + init(builder.build()); + } + + public JdbcFragment(String inputSourceId, String uri) { + this.inputSourceId = inputSourceId; + this.uri = uri; + this.hostNames = extractHosts(uri); + } + + private void init(JdbcFragmentProto proto) { + this.uri = proto.getUri(); + this.inputSourceId = proto.getInputSourceId(); + this.hostNames = proto.getHostsList().toArray(new String [proto.getHostsCount()]); + } + + private String [] extractHosts(String uri) { + return new String[] {ConnectionInfo.fromURI(uri).host}; + } + + @Override + public String getTableName() { + return inputSourceId; + } + + public String getUri() { + return uri; + } + + @Override + public long getLength() { + return 0; + } + + @Override + public String getKey() { + return null; + } + + @Override + public String[] getHosts() { + return hostNames; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public CatalogProtos.FragmentProto getProto() { + JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder(); + builder.setInputSourceId(this.inputSourceId); + builder.setUri(this.uri); + if(hostNames != null) { + builder.addAllHosts(TUtil.newList(hostNames)); + } + + CatalogProtos.FragmentProto.Builder fragmentBuilder = CatalogProtos.FragmentProto.newBuilder(); + fragmentBuilder.setId(this.inputSourceId); + fragmentBuilder.setStoreType("JDBC"); + fragmentBuilder.setContents(builder.buildPartial().toByteString()); + return fragmentBuilder.build(); + } + + @Override + public int compareTo(JdbcFragment o) { + return this.uri.compareTo(o.uri); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java new file mode 100644 index 0000000..eff1b9c --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.exception.*; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.Pair; + +import javax.annotation.Nullable; +import java.net.URI; +import java.sql.*; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import static org.apache.tajo.catalog.CatalogUtil.newSimpleDataType; + +public abstract class JdbcMetadataProviderBase implements MetadataProvider { + protected static final Log LOG = LogFactory.getLog(JdbcMetadataProviderBase.class); + + public static String [] GENERAL_TABLE_TYPES = new String [] {"TABLE"}; + + protected final JdbcTablespace space; + protected final String databaseName; + + protected final String jdbcUri; + + protected final Connection connection; + + public JdbcMetadataProviderBase(JdbcTablespace space, String dbName) { + this.space = space; + this.databaseName = dbName; + + ConnectionInfo connInfo = ConnectionInfo.fromURI(space.getUri()); + this.jdbcUri = space.getUri().toASCIIString(); + + try { + Class.forName(getJdbcDriverName()).newInstance(); + LOG.info(getJdbcDriverName() + " is loaded..."); + } catch (Exception e) { + throw new TajoInternalError(e); + } + + try { + connection = DriverManager.getConnection(jdbcUri, space.connProperties); + } catch (SQLException e) { + throw new TajoInternalError(e); + } + } + + @Override + public String getTablespaceName() { + return space.getName(); + } + + @Override + public URI getTablespaceUri() { + return space.getUri(); + } + + @Override + public String getDatabaseName() { + return databaseName; + } + + @Override + public Collection<String> getSchemas() { + return Collections.EMPTY_SET; + } + + @Override + public Collection<String> getTables(@Nullable String schemaPattern, @Nullable String tablePattern) { + ResultSet res = null; + List<String> tableNames = Lists.newArrayList(); + try { + res = connection.getMetaData().getTables(databaseName, schemaPattern, tablePattern, GENERAL_TABLE_TYPES); + while(res.next()) { + tableNames.add(res.getString("TABLE_NAME")); + } + } catch (SQLException e) { + throw new TajoInternalError(e); + } finally { + try { + if (res != null) { + res.close(); + } + } catch (SQLException e) { + LOG.warn(e); + } + } + + return tableNames; + } + + private TypeDesc convertDataType(ResultSet res) throws SQLException { + final int typeId = res.getInt("DATA_TYPE"); + + switch (typeId ) { + case Types.BOOLEAN: + return new TypeDesc(newSimpleDataType(Type.BOOLEAN)); + + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + return new TypeDesc(newSimpleDataType(Type.INT4)); + + case Types.DISTINCT: // sequence for postgresql + case Types.BIGINT: + return new TypeDesc(newSimpleDataType(Type.INT8)); + + case Types.FLOAT: + return new TypeDesc(newSimpleDataType(Type.FLOAT4)); + + case Types.NUMERIC: + case Types.DECIMAL: + case Types.DOUBLE: + return new TypeDesc(newSimpleDataType(Type.FLOAT8)); + + case Types.DATE: + return new TypeDesc(newSimpleDataType(Type.DATE)); + + case Types.TIME: + return new TypeDesc(newSimpleDataType(Type.TIME)); + + case Types.TIMESTAMP: + return new TypeDesc(newSimpleDataType(Type.TIMESTAMP)); + + case Types.CHAR: + case Types.NCHAR: + case Types.VARCHAR: + case Types.NVARCHAR: + case Types.CLOB: + case Types.NCLOB: + case Types.LONGVARCHAR: + case Types.LONGNVARCHAR: + return new TypeDesc(newSimpleDataType(Type.TEXT)); + + case Types.BINARY: + case Types.VARBINARY: + case Types.BLOB: + return new TypeDesc(newSimpleDataType(Type.BLOB)); + + default: + throw SQLExceptionUtil.toSQLException(new UnsupportedDataTypeException(typeId + "")); + } + } + + @Override + public TableDesc getTableDesc(String schemaName, String tableName) throws UndefinedTablespaceException { + ResultSet resultForTable = null; + ResultSet resultForColumns = null; + try { + + // get table name + resultForTable = connection.getMetaData().getTables(databaseName, schemaName, tableName, null); + + if (!resultForTable.next()) { + throw new UndefinedTablespaceException(tableName); + } + final String name = resultForTable.getString("TABLE_NAME"); + + // get columns + resultForColumns = connection.getMetaData().getColumns(databaseName, schemaName, tableName, null); + + final List<Pair<Integer, Column>> columns = Lists.newArrayList(); + + while(resultForColumns.next()) { + final int ordinalPos = resultForColumns.getInt("ORDINAL_POSITION"); + final String qualifier = resultForColumns.getString("TABLE_NAME"); + final String columnName = resultForColumns.getString("COLUMN_NAME"); + final TypeDesc type = convertDataType(resultForColumns); + final Column c = new Column(CatalogUtil.buildFQName(databaseName, qualifier, columnName), type); + + columns.add(new Pair<>(ordinalPos, c)); + } + + // sort columns in an order of ordinal position + Collections.sort(columns, new Comparator<Pair<Integer, Column>>() { + @Override + public int compare(Pair<Integer, Column> o1, Pair<Integer, Column> o2) { + return o1.getFirst() - o2.getFirst(); + } + }); + + // transform the pair list into collection for columns + final Schema schema = new Schema(Collections2.transform(columns, new Function<Pair<Integer,Column>, Column>() { + @Override + public Column apply(@Nullable Pair<Integer, Column> columnPair) { + return columnPair.getSecond(); + } + })); + + + // fill the table stats + final TableStats stats = new TableStats(); + stats.setNumRows(-1); // unknown + + final TableDesc table = new TableDesc( + CatalogUtil.buildFQName(databaseName, name), + schema, + new TableMeta("rowstore", new KeyValueSet()), + space.getTableUri(databaseName, name) + ); + table.setStats(stats); + + return table; + + } catch (SQLException e) { + throw new TajoInternalError(e); + } finally { + try { + if (resultForTable != null) { + resultForTable.close(); + } + + if (resultForColumns != null) { + resultForColumns.close(); + } + + } catch (SQLException e) { + LOG.warn(e); + } + } + } + + protected abstract String getJdbcDriverName(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java new file mode 100644 index 0000000..82c3be3 --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.TimeDatum; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedDataTypeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.*; +import java.util.Iterator; +import java.util.Properties; + +public abstract class JdbcScanner implements Scanner { + private static final Log LOG = LogFactory.getLog(JdbcScanner.class); + + protected final DatabaseMetaData dbMetaData; + /** JDBC Connection Properties */ + protected final Properties connProperties; + protected final String tableName; + protected final Schema schema; + protected final TableMeta tableMeta; + protected final JdbcFragment fragment; + protected final TableStats stats; + protected final SQLBuilder builder; + + protected Column [] targets; + protected EvalNode filter; + protected Long limit; + protected LogicalNode planPart; + protected VTuple outTuple; + protected String generatedSql; + protected ResultSetIterator iter; + + protected int recordCount = 0; + + /** + * + * @param dbMetaData DatabaseMetaData + * @param connProperties JDBC Connection Properties + * @param tableSchema Table Schema + * @param tableMeta Table Properties + * @param fragment Fragment + */ + public JdbcScanner(final DatabaseMetaData dbMetaData, + final Properties connProperties, + final Schema tableSchema, + final TableMeta tableMeta, + final JdbcFragment fragment) { + + Preconditions.checkNotNull(dbMetaData); + Preconditions.checkNotNull(connProperties); + Preconditions.checkNotNull(tableSchema); + Preconditions.checkNotNull(tableMeta); + Preconditions.checkNotNull(fragment); + + this.dbMetaData = dbMetaData; + this.connProperties = connProperties; + this.tableName = ConnectionInfo.fromURI(fragment.getUri()).tableName; + this.schema = tableSchema; + this.tableMeta = tableMeta; + this.fragment = fragment; + this.stats = new TableStats(); + builder = getSQLBuilder(); + } + + @Override + public void init() throws IOException { + if (targets == null) { + targets = schema.toArray(); + } + outTuple = new VTuple(targets.length); + + if (planPart == null) { + generatedSql = builder.build(tableName, targets, filter, limit); + } else { + generatedSql = builder.build(planPart); + } + } + + @Override + public Tuple next() throws IOException { + if (iter == null) { + iter = executeQueryAndGetIter(); + } + + if (iter.hasNext()) { + return iter.next(); + } else { + return null; + } + } + + @Override + public void reset() throws IOException { + if (iter != null) { + iter.rewind(); + } + } + + @Override + public void close() throws IOException { + if (iter != null) { + iter.close(); + } + } + + @Override + public void pushOperators(LogicalNode planPart) { + this.planPart = planPart; + } + + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public void setTarget(Column [] targets) { + this.targets = targets; + } + + @Override + public boolean isSelectable() { + return true; + } + + @Override + public void setFilter(EvalNode filter) { + this.filter = filter; + } + + @Override + public void setLimit(long num) { + this.limit = num; + } + + @Override + public boolean isSplittable() { + return false; + } + + @Override + public float getProgress() { + return 0; + } + + @Override + public TableStats getInputStats() { + return stats; + } + + @Override + public Schema getSchema() { + return schema; + } + + protected SQLBuilder getSQLBuilder() { + return new SQLBuilder(dbMetaData, getSQLExprBuilder()); + } + protected SQLExpressionGenerator getSQLExprBuilder() { + return new SQLExpressionGenerator(dbMetaData); + } + + protected void convertTuple(ResultSet resultSet, VTuple tuple) { + try { + for (int column_idx = 0; column_idx < targets.length; column_idx++) { + final Column c = targets[column_idx]; + final int resultIdx = column_idx + 1; + + switch (c.getDataType().getType()) { + case INT1: + case INT2: + tuple.put(column_idx, DatumFactory.createInt2(resultSet.getShort(resultIdx))); + break; + case INT4: + tuple.put(column_idx, DatumFactory.createInt4(resultSet.getInt(resultIdx))); + break; + case INT8: + tuple.put(column_idx, DatumFactory.createInt8(resultSet.getLong(resultIdx))); + break; + case FLOAT4: + tuple.put(column_idx, DatumFactory.createFloat4(resultSet.getFloat(resultIdx))); + break; + case FLOAT8: + tuple.put(column_idx, DatumFactory.createFloat8(resultSet.getDouble(resultIdx))); + break; + case CHAR: + tuple.put(column_idx, DatumFactory.createText(resultSet.getString(resultIdx))); + break; + case VARCHAR: + case TEXT: + // TODO - trim is unnecessary in many cases, so we can use it for certain cases + tuple.put(column_idx, DatumFactory.createText(resultSet.getString(resultIdx).trim())); + break; + case DATE: + final Date date = resultSet.getDate(resultIdx); + tuple.put(column_idx, DatumFactory.createDate(1900 + date.getYear(), 1 + date.getMonth(), date.getDate())); + break; + case TIME: + tuple.put(column_idx, new TimeDatum(resultSet.getTime(resultIdx).getTime() * 1000)); + break; + case TIMESTAMP: + tuple.put(column_idx, + DatumFactory.createTimestmpDatumWithJavaMillis(resultSet.getTimestamp(resultIdx).getTime())); + break; + case BINARY: + case VARBINARY: + case BLOB: + tuple.put(column_idx, + DatumFactory.createBlob(resultSet.getBytes(resultIdx))); + break; + default: + throw new TajoInternalError(new UnsupportedDataTypeException(c.getDataType().getType().name())); + } + } + } catch (SQLException s) { + throw new TajoInternalError(s); + } + } + + private ResultSetIterator executeQueryAndGetIter() { + try { + LOG.info("Generated SQL: " + generatedSql); + Connection conn = DriverManager.getConnection(fragment.uri, connProperties); + Statement statement = conn.createStatement(); + ResultSet resultset = statement.executeQuery(generatedSql); + return new ResultSetIterator((resultset)); + } catch (SQLException s) { + throw new TajoInternalError(s); + } + } + + public class ResultSetIterator implements Iterator<Tuple>, Closeable { + + private final ResultSet resultSet; + + private boolean didNext = false; + private boolean hasNext = false; + + public ResultSetIterator(ResultSet resultSet) { + this.resultSet = resultSet; + } + + @Override + public boolean hasNext() { + if (!didNext) { + + try { + hasNext = resultSet.next(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + didNext = true; + } + return hasNext; + } + + @Override + public Tuple next() { + if (!didNext) { + try { + resultSet.next(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + didNext = false; + convertTuple(resultSet, outTuple); + recordCount++; + return outTuple; + } + + @Override + public void remove() { + throw new TajoRuntimeException(new UnsupportedException()); + } + + public void rewind() { + try { + resultSet.isBeforeFirst(); + } catch (SQLException e) { + throw new TajoInternalError(e); + } + } + + @Override + public void close() throws IOException { + try { + resultSet.close(); + } catch (SQLException e) { + LOG.warn(e); + } + + if (stats != null) { + stats.setNumRows(recordCount); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java new file mode 100644 index 0000000..7489307 --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import net.minidev.json.JSONObject; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.catalog.*; +import org.apache.tajo.exception.NotImplementedException; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.UriUtil; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.URI; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * JDBC Tablespace + */ +public abstract class JdbcTablespace extends Tablespace { + private static final Log LOG = LogFactory.getLog(JdbcTablespace.class); + + static final StorageProperty STORAGE_PROPERTY = new StorageProperty("rowstore", false, true, false, true); + static final FormatProperty FORMAT_PROPERTY = new FormatProperty(false, false, false); + + /** + * required configuration + */ + public static final String CONFIG_KEY_MAPPED_DATABASE = "mapped_database"; + /** + * optional configuration + */ + public static final String CONFIG_KEY_CONN_PROPERTIES = "connection_properties"; + + public static final String URI_PARAM_KEY_TABLE = "table"; + + protected Connection conn; + protected String database; + protected Properties connProperties = new Properties(); + + public JdbcTablespace(String name, URI uri, JSONObject config) { + super(name, uri, config); + setDatabase(); + setJdbcProperties(); + } + + private void setDatabase() { + if (config.containsKey(CONFIG_KEY_MAPPED_DATABASE)) { + database = this.config.getAsString(CONFIG_KEY_MAPPED_DATABASE); + } else { + database = ConnectionInfo.fromURI(uri).database(); + } + } + + private void setJdbcProperties() { + Object connPropertiesObjects = config.get(CONFIG_KEY_CONN_PROPERTIES); + if (connPropertiesObjects != null) { + Preconditions.checkState(connPropertiesObjects instanceof JSONObject, "Invalid jdbc_properties field in configs"); + JSONObject connProperties = (JSONObject) connPropertiesObjects; + + for (Map.Entry<String, Object> entry : connProperties.entrySet()) { + this.connProperties.put(entry.getKey(), entry.getValue()); + } + } + } + + @Override + protected void storageInit() throws IOException { + try { + this.conn = DriverManager.getConnection(uri.toASCIIString(), connProperties); + } catch (SQLException e) { + throw new IOException(e); + } + } + + @Override + public long getTableVolume(URI uri) throws UnsupportedException { + throw new UnsupportedException(); + } + + @Override + public URI getTableUri(String databaseName, String tableName) { + return URI.create(UriUtil.addParam(getUri().toASCIIString(), URI_PARAM_KEY_TABLE, tableName)); + } + + @Override + public List<Fragment> getSplits(String inputSourceId, + TableDesc tableDesc, + @Nullable EvalNode filterCondition) throws IOException { + return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, tableDesc.getUri().toASCIIString())); + } + + @Override + public StorageProperty getProperty() { + return STORAGE_PROPERTY; + } + + @Override + public FormatProperty getFormatProperty(TableMeta meta) { + return FORMAT_PROPERTY; + } + + @Override + public void close() { + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + LOG.warn(e); + } + } + } + + @Override + public TupleRange[] getInsertSortRanges(OverridableConf queryContext, + TableDesc tableDesc, + Schema inputSchema, + SortSpec[] sortSpecs, + TupleRange dataRange) throws IOException { + throw new TajoRuntimeException(new NotImplementedException()); + } + + @Override + public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) { + throw new TajoRuntimeException(new NotImplementedException()); + } + + @Override + public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { + throw new TajoRuntimeException(new NotImplementedException()); + } + + @Override + public void purgeTable(TableDesc tableDesc) throws IOException { + throw new TajoRuntimeException(new NotImplementedException()); + } + + @Override + public void prepareTable(LogicalNode node) throws IOException { + throw new TajoRuntimeException(new NotImplementedException()); + } + + @Override + public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException { + throw new TajoRuntimeException(new NotImplementedException()); + } + + @Override + public void rollbackTable(LogicalNode node) throws IOException { + throw new TajoRuntimeException(new NotImplementedException()); + } + + @Override + public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException { + throw new TajoRuntimeException(new UnsupportedException()); + } + + public abstract MetadataProvider getMetadataProvider(); + + @Override + public abstract Scanner getScanner(TableMeta meta, + Schema schema, + Fragment fragment, + @Nullable Schema target) throws IOException; + + public DatabaseMetaData getDatabaseMetaData() { + try { + return conn.getMetaData(); + } catch (SQLException e) { + throw new TajoInternalError(e); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java new file mode 100644 index 0000000..f72d6ee --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.Target; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.util.StringUtils; + +import javax.annotation.Nullable; +import java.sql.DatabaseMetaData; +import java.util.Stack; + +/** + * Generator to build a SQL statement from a plan fragment + */ +public class SQLBuilder { + @SuppressWarnings("unused") + private final DatabaseMetaData dbMetaData; + private final SQLExpressionGenerator sqlExprGen; + + public static class SQLBuilderContext { + StringBuilder sb; + } + + public SQLBuilder(DatabaseMetaData dbMetaData, SQLExpressionGenerator exprGen) { + this.dbMetaData = dbMetaData; + this.sqlExprGen = exprGen; + } + + public String build(String tableName, Column [] targets, @Nullable EvalNode filter, @Nullable Long limit) { + + StringBuilder selectClause = new StringBuilder("SELECT "); + if (targets.length > 0) { + selectClause.append(StringUtils.join(targets, ",", new Function<Column, String>() { + @Override + public String apply(@Nullable Column input) { + return input.getSimpleName(); + } + })); + } else { + selectClause.append("1"); + } + selectClause.append(" "); + + StringBuilder fromClause = new StringBuilder("FROM "); + fromClause.append(tableName).append(" "); + + StringBuilder whereClause = null; + if (filter != null) { + whereClause = new StringBuilder("WHERE "); + whereClause.append(sqlExprGen.generate(filter)).append(" "); + } + + StringBuilder limitClause = null; + if (limit != null) { + limitClause = new StringBuilder("LIMIT "); + limitClause.append(limit).append(" "); + } + + return generateSelectStmt(selectClause, fromClause, whereClause, limitClause); + } + + public String generateSelectStmt(StringBuilder selectClause, + StringBuilder fromClause, + @Nullable StringBuilder whereClause, + @Nullable StringBuilder limitClause) { + return + selectClause.toString() + + fromClause.toString() + + (whereClause != null ? whereClause.toString() : "") + + (limitClause != null ? limitClause.toString() : ""); + } + + public String build(LogicalNode planPart) { + SQLBuilderContext context = new SQLBuilderContext(); + visit(context, planPart, new Stack<LogicalNode>()); + return context.sb.toString(); + } + + public void visit(SQLBuilderContext context, LogicalNode node, Stack<LogicalNode> stack) { + stack.push(node); + + switch (node.getType()) { + case SCAN: + visitScan(context, (ScanNode) node, stack); + break; + + case GROUP_BY: + visitGroupBy(context, (GroupbyNode) node, stack); + break; + + case SELECTION: + visitFilter(context, (SelectionNode) node, stack); + break; + + case PROJECTION: + visitProjection(context, (ProjectionNode) node, stack); + break; + + case TABLE_SUBQUERY: + visitDerivedSubquery(context, (TableSubQueryNode) node, stack); + break; + + default: + throw new TajoRuntimeException(new UnsupportedException("plan node '" + node.getType().name() + "'")); + } + + stack.pop(); + } + + public void visitDerivedSubquery(SQLBuilderContext ctx, TableSubQueryNode derivedSubquery, Stack<LogicalNode> stack) { + ctx.sb.append(" ("); + visit(ctx, derivedSubquery.getSubQuery(), stack); + ctx.sb.append(" ) ").append(derivedSubquery.getTableName()); + } + + public void visitProjection(SQLBuilderContext ctx, ProjectionNode projection, Stack<LogicalNode> stack) { + + visit(ctx, projection.getChild(), stack); + } + + public void visitGroupBy(SQLBuilderContext ctx, GroupbyNode groupby, Stack<LogicalNode> stack) { + visit(ctx, groupby.getChild(), stack); + ctx.sb.append("GROUP BY ").append(StringUtils.join(groupby.getGroupingColumns(), ",", 0)).append(" "); + } + + public void visitFilter(SQLBuilderContext ctx, SelectionNode filter, Stack<LogicalNode> stack) { + visit(ctx, filter.getChild(), stack); + ctx.sb.append("WHERE " + sqlExprGen.generate(filter.getQual())); + } + + public void visitScan(SQLBuilderContext ctx, ScanNode scan, Stack<LogicalNode> stack) { + + StringBuilder selectClause = new StringBuilder("SELECT "); + if (scan.getTargets().length > 0) { + selectClause.append(generateTargetList(scan.getTargets())); + } else { + selectClause.append("1"); + } + + selectClause.append(" "); + + ctx.sb.append("FROM ").append(scan.getTableName()).append(" "); + + if (scan.hasAlias()) { + ctx.sb.append("AS ").append(scan.getAlias()).append(" "); + } + + if (scan.hasQual()) { + ctx.sb.append("WHERE " + sqlExprGen.generate(scan.getQual())); + } + } + + public String generateTargetList(Target [] targets) { + return StringUtils.join(targets, ",", new Function<Target, String>() { + @Override + public String apply(@Nullable Target t) { + StringBuilder sb = new StringBuilder(sqlExprGen.generate(t.getEvalTree())); + if (t.hasAlias()) { + sb.append(" AS ").append(t.getAlias()); + } + return sb.toString(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java new file mode 100644 index 0000000..4d4c781 --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import com.google.common.base.Function; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.datum.BooleanDatum; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.exception.NotImplementedException; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedDataTypeException; +import org.apache.tajo.plan.expr.*; +import org.apache.tajo.util.StringUtils; + +import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.Stack; + +/** + * A generator to build a SQL representation from a sql expression + */ +public class SQLExpressionGenerator extends SimpleEvalNodeVisitor<SQLExpressionGenerator.Context> { + final private DatabaseMetaData dbMetaData; + + private final String LITERAL_QUOTE = "'"; + @SuppressWarnings("unused") + private final String DEFAULT_LITERAL_QUOTE = "'"; + @SuppressWarnings("unused") + private String IDENTIFIER_QUOTE = "\""; + private final String DEFAULT_IDENTIFIER_QUOTE = "\""; + + public SQLExpressionGenerator(DatabaseMetaData dbMetaData) { + this.dbMetaData = dbMetaData; + initDatabaseDependentSQLRepr(); + } + + private void initDatabaseDependentSQLRepr() { + String quoteStr = null; + try { + quoteStr = dbMetaData.getIdentifierQuoteString(); + } catch (SQLException e) { + } + this.IDENTIFIER_QUOTE = quoteStr != null ? quoteStr : DEFAULT_IDENTIFIER_QUOTE; + } + + public String quote(String text) { + return LITERAL_QUOTE + text + LITERAL_QUOTE; + } + + public String generate(EvalNode node) { + Context context = new Context(); + visit(context, node, new Stack<EvalNode>()); + return context.sb.toString(); + } + + public static class Context { + StringBuilder sb = new StringBuilder(); + + public void append(String text) { + sb.append(text).append(" "); + } + } + + @Override + protected EvalNode visitUnaryEval(Context context, UnaryEval unary, Stack<EvalNode> stack) { + + switch (unary.getType()) { + case NOT: + context.sb.append("NOT "); + super.visitUnaryEval(context, unary, stack); + break; + case SIGNED: + SignedEval signed = (SignedEval) unary; + if (signed.isNegative()) { + context.sb.append("-"); + } + super.visitUnaryEval(context, unary, stack); + break; + case IS_NULL: + super.visitUnaryEval(context, unary, stack); + + IsNullEval isNull = (IsNullEval) unary; + if (isNull.isNot()) { + context.sb.append("IS NOT NULL "); + } else { + context.sb.append("IS NULL "); + } + break; + + case CAST: + super.visitUnaryEval(context, unary, stack); + context.sb.append(" AS ").append(convertTajoTypeToSQLType(unary.getValueType())); + } + return unary; + } + + @Override + protected EvalNode visitBinaryEval(Context context, Stack<EvalNode> stack, BinaryEval binaryEval) { + stack.push(binaryEval); + visit(context, binaryEval.getLeftExpr(), stack); + context.sb.append(convertBinOperatorToSQLRepr(binaryEval.getType())).append(" "); + visit(context, binaryEval.getRightExpr(), stack); + stack.pop(); + return binaryEval; + } + + @Override + protected EvalNode visitConst(Context context, ConstEval constant, Stack<EvalNode> stack) { + context.sb.append(convertDatumToSQLLiteral(constant.getValue())).append(" "); + return constant; + } + + protected EvalNode visitRowConstant(Context context, RowConstantEval row, Stack<EvalNode> stack) { + StringBuilder sb = new StringBuilder("("); + sb.append(StringUtils.join(row.getValues(), ",", new Function<Datum, String>() { + @Override + public String apply(Datum v) { + return convertDatumToSQLLiteral(v); + } + })); + sb.append(")"); + context.append(sb.toString()); + + return row; + } + + @Override + protected EvalNode visitField(Context context, FieldEval field, Stack<EvalNode> stack) { + // strip the database name + String tableName; + if (CatalogUtil.isSimpleIdentifier(field.getQualifier())) { + tableName = field.getQualifier(); + } else { + tableName = CatalogUtil.extractSimpleName(field.getQualifier()); + } + + context.append(CatalogUtil.buildFQName(tableName, field.getColumnName())); + return field; + } + + @Override + protected EvalNode visitBetween(Context context, BetweenPredicateEval evalNode, Stack<EvalNode> stack) { + stack.push(evalNode); + visit(context, evalNode.getPredicand(), stack); + context.append("BETWEEN"); + visit(context, evalNode.getBegin(), stack); + context.append("AND"); + visit(context, evalNode.getEnd(), stack); + return evalNode; + } + + @Override + protected EvalNode visitCaseWhen(Context context, CaseWhenEval evalNode, Stack<EvalNode> stack) { + stack.push(evalNode); + context.append("CASE"); + for (CaseWhenEval.IfThenEval ifThenEval : evalNode.getIfThenEvals()) { + visitIfThen(context, ifThenEval, stack); + } + + context.append("ELSE"); + if (evalNode.hasElse()) { + visit(context, evalNode.getElse(), stack); + } + stack.pop(); + context.append("END"); + return evalNode; + } + + @Override + protected EvalNode visitIfThen(Context context, CaseWhenEval.IfThenEval evalNode, Stack<EvalNode> stack) { + stack.push(evalNode); + context.append("WHEN"); + visit(context, evalNode.getCondition(), stack); + context.append("THEN"); + visit(context, evalNode.getResult(), stack); + stack.pop(); + return evalNode; + } + + @Override + protected EvalNode visitFuncCall(Context context, FunctionEval func, Stack<EvalNode> stack) { + // TODO - TAJO-1837 should be resolved if we support RDBMS functions better. + + stack.push(func); + + context.sb.append(func.getName()).append("("); + + boolean first = true; + for (EvalNode param : func.getArgs()) { + if (first) { + first = false; + } else { + context.sb.append(","); + } + + visit(context, param, stack); + } + + context.sb.append(")"); + stack.pop(); + + return func; + } + + @Override + protected EvalNode visitSubquery(Context context, SubqueryEval evalNode, Stack<EvalNode> stack) { + throw new TajoRuntimeException(new NotImplementedException()); + } + + /** + * convert Tajo literal into SQL representation + * + * @param d Datum + */ + public String convertDatumToSQLLiteral(Datum d) { + switch (d.type()) { + case BOOLEAN: + return d.asBool() ? "TRUE" : "FALSE"; + + case INT1: + case INT2: + case INT4: + case INT8: + case FLOAT4: + case FLOAT8: + case NUMERIC: + return d.asChars(); + + case TEXT: + case VARCHAR: + case CHAR: + return quote(d.asChars()); + + case DATE: + return "DATE " + quote(d.asChars()); + + case TIME: + return "TIME " + quote(d.asChars()); + + case TIMESTAMP: + return "TIMESTAMP " + quote(d.asChars()); + + case NULL_TYPE: + return "NULL"; + + default: + throw new TajoRuntimeException(new UnsupportedDataTypeException(d.type().name())); + } + } + + /** + * Convert Tajo DataType into SQL DataType + * + * @param dataType Tajo DataType + * @return SQL DataType + */ + public String convertTajoTypeToSQLType(DataType dataType) { + switch (dataType.getType()) { + case INT1: + return "TINYINT"; + case INT2: + return "SMALLINT"; + case INT4: + return "INTEGER"; + case INT8: + return "BIGINT"; + case FLOAT4: + return "FLOAT"; + case FLOAT8: + return "DOUBLE"; + default: + return dataType.getType().name(); + } + } + + /** + * Convert EvalType the operator notation into SQL notation + * + * @param op EvalType + * @return SQL representation + */ + public String convertBinOperatorToSQLRepr(EvalType op) { + return op.getOperatorName(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-jdbc/src/main/proto/JdbcFragmentProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/proto/JdbcFragmentProtos.proto b/tajo-storage/tajo-storage-jdbc/src/main/proto/JdbcFragmentProtos.proto new file mode 100644 index 0000000..f642e07 --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/proto/JdbcFragmentProtos.proto @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.storage.jdbc"; +option java_outer_classname = "JdbcFragmentProtos"; +option optimize_for = SPEED; +option java_generic_services = false; +option java_generate_equals_and_hash = true; + +import "CatalogProtos.proto"; + +message JdbcFragmentProto { + required string uri = 1; + required string input_source_id = 2; + repeated string hosts = 3; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/TestConnectionInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/TestConnectionInfo.java b/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/TestConnectionInfo.java new file mode 100644 index 0000000..a04209f --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/TestConnectionInfo.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestConnectionInfo { + @Test + public final void testGetConnectionInfoType1() { + ConnectionInfo c1 = ConnectionInfo.fromURI("jdbc:mysql://localhost:55840?user=testuser&password=testpass"); + assertEquals("jdbc:mysql", c1.scheme); + assertEquals("localhost", c1.host); + assertEquals("testuser", c1.user); + assertEquals("testpass", c1.password); + assertNull(c1.dbName); + assertNull(c1.tableName); + assertEquals(0, c1.params.size()); + } + + @Test + public final void testGetConnectionInfoType2() { + ConnectionInfo c1 = ConnectionInfo.fromURI( + "jdbc:mysql://localhost:55840/db1?table=tb1&user=testuser&password=testpass&TZ=GMT+9"); + assertEquals("jdbc:mysql", c1.scheme); + assertEquals("localhost", c1.host); + assertEquals("testuser", c1.user); + assertEquals("testpass", c1.password); + assertEquals("db1", c1.dbName); + assertEquals("tb1", c1.tableName); + assertEquals(1, c1.params.size()); + assertEquals("GMT+9", c1.params.get("TZ")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-pgsql/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-pgsql/pom.xml b/tajo-storage/tajo-storage-pgsql/pom.xml new file mode 100644 index 0000000..c36d9bf --- /dev/null +++ b/tajo-storage/tajo-storage-pgsql/pom.xml @@ -0,0 +1,257 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tajo-project</artifactId> + <groupId>org.apache.tajo</groupId> + <version>0.12.0-SNAPSHOT</version> + <relativePath>../../tajo-project</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tajo-storage-pgsql</artifactId> + <packaging>jar</packaging> + <name>Tajo PostgreSQL JDBC storage</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>derby.log</exclude> + <exclude>src/test/resources/dataset/**</exclude> + <exclude>src/test/resources/queries/**</exclude> + <exclude>src/test/resources/results/**</exclude> + <exclude>src/test/resources/pgsql/**</exclude> + </excludes> + </configuration> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <tajo.test>TRUE</tajo.test> + </systemProperties> + <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + + <dependencies> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-catalog-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-plan</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hdfs</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-jdbc</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-cluster-tests</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.airlift</groupId> + <artifactId>testing-postgresql-server</artifactId> + <version>0.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>15.0</version> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </reporting> +</project> http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLJdbcScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLJdbcScanner.java b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLJdbcScanner.java new file mode 100644 index 0000000..2fa9f62 --- /dev/null +++ b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLJdbcScanner.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.pgsql; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.jdbc.JdbcFragment; +import org.apache.tajo.storage.jdbc.JdbcScanner; + +import java.sql.DatabaseMetaData; +import java.util.Properties; + +public class PgSQLJdbcScanner extends JdbcScanner { + + public PgSQLJdbcScanner(DatabaseMetaData dbMetaData, + Properties connProperties, + Schema tableSchema, + TableMeta tableMeta, + JdbcFragment fragment) { + super(dbMetaData, connProperties, tableSchema, tableMeta, fragment); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLMetadataProvider.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLMetadataProvider.java b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLMetadataProvider.java new file mode 100644 index 0000000..069ffe3 --- /dev/null +++ b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLMetadataProvider.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.pgsql; + +import org.apache.tajo.storage.jdbc.JdbcMetadataProviderBase; + +import javax.annotation.Nullable; +import java.util.Collection; + +public class PgSQLMetadataProvider extends JdbcMetadataProviderBase { + + public PgSQLMetadataProvider(PgSQLTablespace space, String dbName) { + super(space, dbName); + } + + @Override + protected String getJdbcDriverName() { + return "org.postgresql.Driver"; + } + + @Override + public Collection<String> getTables(@Nullable String schemaPattern, @Nullable String tablePattern) { + return super.getTables("public", tablePattern); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java new file mode 100644 index 0000000..1969d13 --- /dev/null +++ b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.pgsql; + +import net.minidev.json.JSONObject; +import org.apache.tajo.catalog.MetadataProvider; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.storage.NullScanner; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.jdbc.JdbcFragment; +import org.apache.tajo.storage.jdbc.JdbcTablespace; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.URI; + +/** + * Postgresql Database Tablespace + */ +public class PgSQLTablespace extends JdbcTablespace { + + public PgSQLTablespace(String name, URI uri, JSONObject config) { + super(name, uri, config); + } + + public MetadataProvider getMetadataProvider() { + return new PgSQLMetadataProvider(this, database); + } + + @Override + public Scanner getScanner(TableMeta meta, + Schema schema, + Fragment fragment, + @Nullable Schema target) throws IOException { + if (!(fragment instanceof JdbcFragment)) { + throw new TajoInternalError("fragment must be JdbcFragment"); + } + + if (target == null) { + target = schema; + } + + Scanner scanner; + if (fragment.isEmpty()) { + scanner = new NullScanner(conf, schema, meta, fragment); + } else { + scanner = new PgSQLJdbcScanner(getDatabaseMetaData(), connProperties, schema, meta, (JdbcFragment) fragment); + } + scanner.setTarget(target.toArray()); + return scanner; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java new file mode 100644 index 0000000..024b497 --- /dev/null +++ b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.pgsql; + +import com.google.common.base.Optional; +import io.airlift.testing.postgresql.TestingPostgreSqlServer; +import net.minidev.json.JSONObject; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.Tablespace; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.storage.jdbc.JdbcTablespace; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.JavaResourceUtil; + +import java.io.IOException; +import java.net.URI; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; + +public class PgSQLTestServer { + private static final Log LOG = LogFactory.getLog(PgSQLTestServer.class); + + private static PgSQLTestServer instance; + + public static final String [] TPCH_TABLES = { + "customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier" + }; + + public static final String SPACENAME = "pgsql_cluster"; + public static final String DATABASE_NAME = "tpch"; + public static final String USERNAME = "testuser"; + public static final String PASSWORD = ""; + + private final TestingPostgreSqlServer server; + + static { + try { + instance = new PgSQLTestServer(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static PgSQLTestServer getInstance() { + return instance; + } + + private PgSQLTestServer() throws Exception { + server = new TestingPostgreSqlServer(USERNAME, + "tpch" + ); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + try { + server.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + })); + + loadTPCHTables(); + registerTablespace(); + } + + Path testPath = CommonTestingUtil.getTestDir(); + + private void loadTPCHTables() throws SQLException, IOException { + + + try (Connection connection = DriverManager.getConnection(getJdbcUrlForAdmin(), "postgres", null)) { + connection.setCatalog("tpch"); + + try (Statement statement = connection.createStatement()) { + + for (String tableName : TPCH_TABLES) { + String sql = JavaResourceUtil.readTextFromResource("pgsql/" + tableName + ".sql"); + statement.executeUpdate(sql); + + // restore the table contents into a file stored in a local file system for PgSQL COPY command + String path = restoreTableContents(tableName); + String copyCommand = genLoadStatement(tableName, path); + statement.executeUpdate(copyCommand); + } + + // load DATETIME_TYPES table + String sql = JavaResourceUtil.readTextFromResource("pgsql/datetime_types.sql"); + statement.executeUpdate(sql); + Path filePath = new Path(testPath, "datetime_types.txt"); + storeTableContents("pgsql/datetime_types.txt", filePath); + String copyCommand = genLoadStatement("datetime_types", filePath.toUri().getPath()); + LOG.info(copyCommand); + statement.executeUpdate(copyCommand); + + } catch (Throwable t) { + t.printStackTrace(); + throw t; + } + } + } + + private String genLoadStatement(String tableName, String path) { + return "COPY " + tableName + " FROM '" + path + "' WITH (FORMAT csv, DELIMITER '|');"; + } + + private void storeTableContents(String resource, Path path) throws IOException { + String csvTable = JavaResourceUtil.readTextFromResource(resource); + String fixedCsvTable = fixExtraColumn(csvTable); + FileUtil.writeTextToFile(fixedCsvTable, path); + } + + private String restoreTableContents(String tableName) throws IOException { + Path filePath = new Path(testPath, tableName + ".tbl"); + storeTableContents("tpch/" + tableName + ".tbl", filePath); + return filePath.toUri().getPath(); + } + + private String fixExtraColumn(String csvTable) { + final String [] lines = csvTable.split("\n"); + final StringBuilder rewritten = new StringBuilder(); + + for (String l : lines) { + if (l.charAt(l.length() - 1) == '|') { + rewritten.append(l.substring(0, l.length() - 1)); + } else { + rewritten.append(l.substring(0, l.length())); + } + rewritten.append("\n"); + } + + return rewritten.toString(); + } + + private void registerTablespace() throws IOException { + JSONObject configElements = new JSONObject(); + configElements.put(JdbcTablespace.CONFIG_KEY_MAPPED_DATABASE, DATABASE_NAME); + + PgSQLTablespace tablespace = new PgSQLTablespace(SPACENAME, URI.create(getJdbcUrlForAdmin()), configElements); + tablespace.init(new TajoConf()); + + TablespaceManager.addTableSpaceForTest(tablespace); + } + + /** + * get JDBC URL for a created user + * + * @return JDBC URL for the created user + */ + public String getJdbcUrl() { + return server.getJdbcUrl() + "&connectTimeout=5&socketTimeout=5"; + } + + /** + * get JDBC URL for the Admin user + * + * @return JDBC URL for the Admin user + */ + public String getJdbcUrlForAdmin() { + // replace 'user' by postgres (admin) + String url = server.getJdbcUrl().split("\\?")[0]; + url += "?user=postgres&connectTimeout=5&socketTimeout=5"; + return url; + } + + public TestingPostgreSqlServer getServer() { + return server; + } + + public static Optional<Tablespace> resetAllParamsAndSetConnProperties(Map<String, String> connProperties) + throws IOException { + String uri = PgSQLTestServer.getInstance().getJdbcUrl().split("\\?")[0]; + + JSONObject configElements = new JSONObject(); + configElements.put(JdbcTablespace.CONFIG_KEY_MAPPED_DATABASE, PgSQLTestServer.DATABASE_NAME); + + JSONObject connPropertiesJson = new JSONObject(); + for (Map.Entry<String, String> entry : connProperties.entrySet()) { + connPropertiesJson.put(entry.getKey(), entry.getValue()); + } + configElements.put(JdbcTablespace.CONFIG_KEY_CONN_PROPERTIES, connPropertiesJson); + + PgSQLTablespace tablespace = new PgSQLTablespace(PgSQLTestServer.SPACENAME, URI.create(uri), configElements); + tablespace.init(new TajoConf()); + return TablespaceManager.addTableSpaceForTest(tablespace); + } +}
