Repository: samza Updated Branches: refs/heads/1.0.0 ba90a51c8 -> b8e7103be
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java new file mode 100755 index 0000000..7c2ca32 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.impl; + +import com.google.common.base.Joiner; +import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.exception.ZkTimeoutException; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.samza.SamzaException; +import org.apache.samza.config.*; +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; +import org.apache.samza.job.ApplicationStatus; +import org.apache.samza.serializers.StringSerdeFactory; +import org.apache.samza.sql.avro.AvroRelSchemaProvider; +import org.apache.samza.sql.client.interfaces.*; +import org.apache.samza.sql.client.util.RandomAccessQueue; +import org.apache.samza.sql.dsl.SamzaSqlDslConverter; +import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory; +import org.apache.samza.sql.fn.FlattenUdf; +import org.apache.samza.sql.fn.RegexMatchUdf; +import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; +import org.apache.samza.sql.impl.ConfigBasedUdfResolver; +import org.apache.samza.sql.interfaces.RelSchemaProvider; +import org.apache.samza.sql.interfaces.RelSchemaProviderFactory; +import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.sql.interfaces.SqlIOResolver; +import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; +import org.apache.samza.sql.testutil.JsonUtil; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.kafka.KafkaSystemFactory; +import org.apache.samza.tools.avro.AvroSchemaGenRelConverterFactory; +import org.apache.samza.tools.avro.AvroSerDeFactory; +import org.apache.samza.tools.json.JsonRelConverterFactory; +import org.apache.samza.tools.schemas.ProfileChangeEvent; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConversions; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + + +/** + * Samza implementation of Executor for Samza SQL Shell. + */ +public class SamzaExecutor implements SqlExecutor { + private static final Logger LOG = LoggerFactory.getLogger(SamzaExecutor.class); + + private static final String SAMZA_SYSTEM_LOG = "log"; + private static final String SAMZA_SYSTEM_KAFKA = "kafka"; + private static final String SAMZA_SQL_OUTPUT = "samza.sql.output"; + private static final String SAMZA_SQL_SYSTEM_KAFKA_ADDRESS = "samza.sql.system.kafka.address"; + private static final String DEFAULT_SERVER_ADDRESS = "localhost:2181"; + + // The maximum number of rows of data we keep when user pauses the display view and data accumulates. + private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000; + private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000; + + private static RandomAccessQueue<OutgoingMessageEnvelope> outputData = + new RandomAccessQueue<>(OutgoingMessageEnvelope.class, RANDOM_ACCESS_QUEUE_CAPACITY); + private static AtomicInteger execIdSeq = new AtomicInteger(0); + private Map<Integer, SamzaSqlApplicationRunner> executions = new HashMap<>(); + private String lastErrorMsg = ""; + + // -- implementation of SqlExecutor ------------------------------------------ + + @Override + public void start(ExecutionContext context) { + } + + @Override + public void stop(ExecutionContext context) { + Iterator<Integer> iter = executions.keySet().iterator(); + while (iter.hasNext()) { + stopExecution(context, iter.next()); + iter.remove(); + } + outputData.clear(); + } + + @Override + public List<String> listTables(ExecutionContext context) { + /** + * TODO: currently Shell can only talk to Kafka system, but we should use a general way + * to connect to different systems. + */ + lastErrorMsg = ""; + String address = context.getConfigMap().getOrDefault(SAMZA_SQL_SYSTEM_KAFKA_ADDRESS, DEFAULT_SERVER_ADDRESS); + List<String> tables = null; + try { + ZkUtils zkUtils = new ZkUtils(new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT), + new ZkConnection(address), false); + tables = JavaConversions.seqAsJavaList(zkUtils.getAllTopics()) + .stream() + .map(x -> SAMZA_SYSTEM_KAFKA + "." + x) + .collect(Collectors.toList()); + } catch (ZkTimeoutException ex) { + lastErrorMsg = ex.toString(); + LOG.error(lastErrorMsg); + } + return tables; + } + + @Override + public SqlSchema getTableSchema(ExecutionContext context, String tableName) { + /** + * currently Shell works only for systems that has Avro schemas + */ + lastErrorMsg = ""; + int execId = execIdSeq.incrementAndGet(); + Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context); + Config samzaSqlConfig = new MapConfig(staticConfigs); + SqlSchema sqlSchema = null; + try { + SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(samzaSqlConfig); + SqlIOConfig sourceInfo = ioResolver.fetchSourceInfo(tableName); + RelSchemaProvider schemaProvider = + SamzaSqlApplicationConfig.initializePlugin("RelSchemaProvider", sourceInfo.getRelSchemaProviderName(), + samzaSqlConfig, SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, + (o, c) -> ((RelSchemaProviderFactory) o).create(sourceInfo.getSystemStream(), c)); + AvroRelSchemaProvider avroSchemaProvider = (AvroRelSchemaProvider) schemaProvider; + String schema = avroSchemaProvider.getSchema(sourceInfo.getSystemStream()); + sqlSchema = AvroSqlSchemaConverter.convertAvroToSamzaSqlSchema(schema); + } catch (SamzaException ex) { + lastErrorMsg = ex.toString(); + LOG.error(lastErrorMsg); + } + return sqlSchema; + } + + @Override + public QueryResult executeQuery(ExecutionContext context, String statement) { + lastErrorMsg = ""; + outputData.clear(); + + int execId = execIdSeq.incrementAndGet(); + Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context); + List<String> sqlStmts = formatSqlStmts(Collections.singletonList(statement)); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + + SamzaSqlApplicationRunner runner; + try { + runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.run(); + } catch (SamzaException ex) { + lastErrorMsg = ex.toString(); + LOG.error(lastErrorMsg); + return new QueryResult(execId, null, false); + } + executions.put(execId, runner); + LOG.debug("Executing sql. Id ", execId); + + return new QueryResult(execId, generateResultSchema(new MapConfig(staticConfigs)), true); + } + + @Override + public int getRowCount() { + return outputData.getSize(); + } + + @Override + public List<String[]> retrieveQueryResult(ExecutionContext context, int startRow, int endRow) { + List<String[]> results = new ArrayList<>(); + for (OutgoingMessageEnvelope row : outputData.get(startRow, endRow)) { + results.add(getFormattedRow(context, row)); + } + return results; + } + + @Override + public List<String[]> consumeQueryResult(ExecutionContext context, int startRow, int endRow) { + List<String[]> results = new ArrayList<>(); + for (OutgoingMessageEnvelope row : outputData.consume(startRow, endRow)) { + results.add(getFormattedRow(context, row)); + } + return results; + } + + @Override + public NonQueryResult executeNonQuery(ExecutionContext context, File sqlFile) { + lastErrorMsg = ""; + + LOG.info("Sql file path: " + sqlFile.getPath()); + List<String> executedStmts = new ArrayList<>(); + try { + executedStmts = Files.lines(Paths.get(sqlFile.getPath())).collect(Collectors.toList()); + } catch (IOException e) { + lastErrorMsg = String.format("Unable to parse the sql file %s. %s", sqlFile.getPath(), e.toString()); + LOG.error(lastErrorMsg); + return new NonQueryResult(-1, false); + } + LOG.info("Sql statements in Sql file: " + executedStmts.toString()); + + List<String> submittedStmts = new ArrayList<>(); + List<String> nonSubmittedStmts = new ArrayList<>(); + validateExecutedStmts(executedStmts, submittedStmts, nonSubmittedStmts); + if (submittedStmts.isEmpty()) { + lastErrorMsg = "Nothing to execute. Note: SELECT statements are ignored."; + LOG.warn("Nothing to execute. Statements in the Sql file: {}", nonSubmittedStmts); + return new NonQueryResult(-1, false); + } + NonQueryResult result = executeNonQuery(context, submittedStmts); + return new NonQueryResult(result.getExecutionId(), result.succeeded(), submittedStmts, nonSubmittedStmts); + } + + @Override + public NonQueryResult executeNonQuery(ExecutionContext context, List<String> statement) { + lastErrorMsg = ""; + + int execId = execIdSeq.incrementAndGet(); + Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(formatSqlStmts(statement))); + + SamzaSqlApplicationRunner runner; + try { + runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.run(); + } catch (SamzaException ex) { + lastErrorMsg = ex.toString(); + LOG.error(lastErrorMsg); + return new NonQueryResult(execId, false); + } + executions.put(execId, runner); + LOG.debug("Executing sql. Id ", execId); + + return new NonQueryResult(execId, true); + } + + @Override + public boolean stopExecution(ExecutionContext context, int exeId) { + lastErrorMsg = ""; + + SamzaSqlApplicationRunner runner = executions.get(exeId); + if (runner != null) { + LOG.debug("Stopping execution ", exeId); + + try { + runner.kill(); + } catch (SamzaException ex) { + lastErrorMsg = ex.toString(); + LOG.debug(lastErrorMsg); + return false; + } + + try { + Thread.sleep(500); // wait for a second + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return true; + } else { + lastErrorMsg = "Trying to stop a non-existing SQL execution " + exeId; + LOG.warn(lastErrorMsg); + return false; + } + } + + @Override + public boolean removeExecution(ExecutionContext context, int exeId) { + lastErrorMsg = ""; + + SamzaSqlApplicationRunner runner = executions.get(exeId); + if (runner != null) { + if (runner.status().getStatusCode().equals(ApplicationStatus.StatusCode.Running)) { + lastErrorMsg = "Trying to remove a ongoing execution " + exeId; + LOG.error(lastErrorMsg); + return false; + } + executions.remove(exeId); + LOG.debug("Stopping execution ", exeId); + return true; + } else { + lastErrorMsg = "Trying to remove a non-existing SQL execution " + exeId; + LOG.warn(lastErrorMsg); + return false; + } + } + + @Override + public ExecutionStatus queryExecutionStatus(int execId) { + SamzaSqlApplicationRunner runner = executions.get(execId); + if (runner == null) { + return null; + } + return queryExecutionStatus(runner); + } + + @Override + public String getErrorMsg() { + return lastErrorMsg; + } + + @Override + public List<SqlFunction> listFunctions(ExecutionContext context) { + /** + * TODO: currently the Shell only shows some UDFs supported by Samza internally. We may need to require UDFs + * to provide a function of getting their "SamzaSqlUdfDisplayInfo", then we can get the UDF information from + * SamzaSqlApplicationConfig.udfResolver(or SamzaSqlApplicationConfig.udfMetadata) instead of registering + * UDFs one by one as below. + */ + List<SqlFunction> udfs = new ArrayList<>(); + udfs.add(new SamzaSqlUdfDisplayInfo("RegexMatch", "Matches the string to the regex", + Arrays.asList(SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING), + SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING)), + SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BOOLEAN))); + + return udfs; + } + + static void saveOutputMessage(OutgoingMessageEnvelope messageEnvelope) { + outputData.add(messageEnvelope); + } + + static Map<String, String> fetchSamzaSqlConfig(int execId, ExecutionContext executionContext) { + HashMap<String, String> staticConfigs = new HashMap<>(); + + staticConfigs.put(JobConfig.JOB_NAME(), "sql-job-" + execId); + staticConfigs.put(JobConfig.PROCESSOR_ID(), String.valueOf(execId)); + staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); + staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + + staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config"); + String configIOResolverDomain = + String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config"); + staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY, + ConfigBasedIOResolverFactory.class.getName()); + + staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config"); + String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config"); + staticConfigs.put(configUdfResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY, + ConfigBasedUdfResolver.class.getName()); + staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES, + Joiner.on(",").join(RegexMatchUdf.class.getName(), FlattenUdf.class.getName())); + + staticConfigs.put("serializers.registry.string.class", StringSerdeFactory.class.getName()); + staticConfigs.put("serializers.registry.avro.class", AvroSerDeFactory.class.getName()); + staticConfigs.put(AvroSerDeFactory.CFG_AVRO_SCHEMA, ProfileChangeEvent.SCHEMA$.toString()); + + String kafkaSystemConfigPrefix = + String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_KAFKA); + String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_KAFKA); + staticConfigs.put(kafkaSystemConfigPrefix + "samza.factory", KafkaSystemFactory.class.getName()); + staticConfigs.put(kafkaSystemConfigPrefix + "samza.key.serde", "string"); + staticConfigs.put(kafkaSystemConfigPrefix + "samza.msg.serde", "avro"); + staticConfigs.put(kafkaSystemConfigPrefix + "consumer.zookeeper.connect", "localhost:2181"); + staticConfigs.put(kafkaSystemConfigPrefix + "producer.bootstrap.servers", "localhost:9092"); + + staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.reset", "true"); + staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.default", "oldest"); + + staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro"); + staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config"); + + String logSystemConfigPrefix = + String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_LOG); + String logSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_LOG); + staticConfigs.put(logSystemConfigPrefix + "samza.factory", CliLoggingSystemFactory.class.getName()); + staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "json"); + staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config"); + + String avroSamzaToRelMsgConverterDomain = + String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro"); + + staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY, + AvroSchemaGenRelConverterFactory.class.getName()); + + String jsonSamzaToRelMsgConverterDomain = + String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "json"); + + staticConfigs.put(jsonSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY, + JsonRelConverterFactory.class.getName()); + + String configAvroRelSchemaProviderDomain = + String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config"); + staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY, + FileSystemAvroRelSchemaProviderFactory.class.getName()); + + staticConfigs.put( + configAvroRelSchemaProviderDomain + FileSystemAvroRelSchemaProviderFactory.CFG_SCHEMA_DIR, + "/tmp/schemas/"); + + /* TODO: we need to validate and read configurations from shell-defaults.conf (aka. "executionContext"), + * and update their value if they've been included in staticConfigs. We could handle these logic + * Shell level, or in Executor level. + */ + staticConfigs.putAll(executionContext.getConfigMap()); + + return staticConfigs; + } + + private List<String> formatSqlStmts(List<String> statements) { + return statements.stream().map(sql -> { + if (!sql.toLowerCase().startsWith("insert")) { + String formattedSql = String.format("insert into log.outputStream %s", sql); + LOG.debug("Sql formatted. ", sql, formattedSql); + return formattedSql; + } else { + return sql; + } + }).collect(Collectors.toList()); + } + + private void validateExecutedStmts(List<String> statements, List<String> submittedStmts, + List<String> nonSubmittedStmts) { + for (String sql : statements) { + if (sql.isEmpty()) { + continue; + } + if (!sql.toLowerCase().startsWith("insert")) { + nonSubmittedStmts.add(sql); + } else { + submittedStmts.add(sql); + } + } + } + + SqlSchema generateResultSchema(Config config) { + SamzaSqlDslConverter converter = (SamzaSqlDslConverter) new SamzaSqlDslConverterFactory().create(config); + RelRoot relRoot = converter.convertDsl("").iterator().next(); + + List<String> colNames = new ArrayList<>(); + List<String> colTypeNames = new ArrayList<>(); + for (RelDataTypeField dataTypeField : relRoot.validatedRowType.getFieldList()) { + colNames.add(dataTypeField.getName()); + colTypeNames.add(dataTypeField.getType().toString()); + } + + return new SqlSchema(colNames, colTypeNames); + } + + private String[] getFormattedRow(ExecutionContext context, OutgoingMessageEnvelope row) { + String[] formattedRow = new String[1]; + String outputFormat = context.getConfigMap().get(SAMZA_SQL_OUTPUT); + if (outputFormat == null || !outputFormat.equalsIgnoreCase(MessageFormat.PRETTY.toString())) { + formattedRow[0] = getCompressedFormat(row); + } else { + formattedRow[0] = getPrettyFormat(row); + } + return formattedRow; + } + + private ExecutionStatus queryExecutionStatus(SamzaSqlApplicationRunner runner) { + lastErrorMsg = ""; + switch (runner.status().getStatusCode()) { + case New: + return ExecutionStatus.New; + case Running: + return ExecutionStatus.Running; + case SuccessfulFinish: + return ExecutionStatus.SuccessfulFinish; + case UnsuccessfulFinish: + return ExecutionStatus.UnsuccessfulFinish; + default: + lastErrorMsg = String.format("Unsupported execution status %s", + runner.status().getStatusCode().toString()); + return null; + } + } + + private String getPrettyFormat(OutgoingMessageEnvelope envelope) { + String value = new String((byte[]) envelope.getMessage()); + ObjectMapper mapper = new ObjectMapper(); + String formattedValue; + try { + Object json = mapper.readValue(value, Object.class); + formattedValue = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json); + } catch (IOException e) { + formattedValue = value; + LOG.error("Error while formatting json", e); + } + return formattedValue; + } + + private String getCompressedFormat(OutgoingMessageEnvelope envelope) { + return new String((byte[]) envelope.getMessage()); + } + + private enum MessageFormat { + PRETTY, + COMPACT + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java new file mode 100644 index 0000000..ed11a53 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.impl; + +import org.apache.samza.sql.client.interfaces.SqlSchema; + +/** + * Types of Samza Sql fields. + */ +public class SamzaSqlFieldType { + + private TypeName typeName; + private SamzaSqlFieldType elementType; + private SamzaSqlFieldType valueType; + private SqlSchema rowSchema; + + private SamzaSqlFieldType(TypeName typeName, SamzaSqlFieldType elementType, SamzaSqlFieldType valueType, SqlSchema rowSchema) { + this.typeName = typeName; + this.elementType = elementType; + this.valueType = valueType; + this.rowSchema = rowSchema; + } + + public static SamzaSqlFieldType createPrimitiveFieldType(TypeName typeName) { + return new SamzaSqlFieldType(typeName, null, null, null); + } + + public static SamzaSqlFieldType createArrayFieldType(SamzaSqlFieldType elementType) { + return new SamzaSqlFieldType(TypeName.ARRAY, elementType, null, null); + } + + public static SamzaSqlFieldType createMapFieldType(SamzaSqlFieldType valueType) { + return new SamzaSqlFieldType(TypeName.MAP, null, valueType, null); + } + + public static SamzaSqlFieldType createRowFieldType(SqlSchema rowSchema) { + return new SamzaSqlFieldType(TypeName.ROW, null, null, rowSchema); + } + + public boolean isPrimitiveField() { + return typeName != TypeName.ARRAY && typeName != TypeName.MAP && typeName != TypeName.ROW; + } + + public TypeName getTypeName() { + return typeName; + } + + public SamzaSqlFieldType getElementType() { + return elementType; + } + + public SamzaSqlFieldType getValueType() { + return valueType; + } + + public SqlSchema getRowSchema() { + return rowSchema; + } + + public enum TypeName { + BYTE, // One-byte signed integer. + INT16, // two-byte signed integer. + INT32, // four-byte signed integer. + INT64, // eight-byte signed integer. + DECIMAL, // Decimal integer + FLOAT, + DOUBLE, + STRING, // String. + DATETIME, // Date and time. + BOOLEAN, // Boolean. + BYTES, // Byte array. + ARRAY, + MAP, + ROW, // The field is itself a nested row. + ANY + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java new file mode 100644 index 0000000..2e89978 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.impl; + +import com.google.common.base.Joiner; +import org.apache.samza.sql.client.interfaces.SqlFunction; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * UDF information displayer + */ +public class SamzaSqlUdfDisplayInfo implements SqlFunction { + + private String name; + + private String description; + + private List<SamzaSqlFieldType> argumentTypes; + + private SamzaSqlFieldType returnType; + + public SamzaSqlUdfDisplayInfo(String name, String description, List<SamzaSqlFieldType> argumentTypes, + SamzaSqlFieldType returnType) { + this.name = name; + this.description = description; + this.argumentTypes = argumentTypes; + this.returnType = returnType; + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + + public List<String> getArgumentTypes() { + return argumentTypes.stream().map(x -> x.getTypeName().toString()).collect(Collectors.toList()); + } + + public String getReturnType() { + return returnType.getTypeName().toString(); + } + + public String toString() { + List<String> argumentTypeNames = + argumentTypes.stream().map(x -> x.getTypeName().toString()).collect(Collectors.toList()); + String args = Joiner.on(", ").join(argumentTypeNames); + return String.format("%s(%s) returns <%s> : %s", name, args, returnType.getTypeName().toString(), description); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java new file mode 100644 index 0000000..14dfa64 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.interfaces; + +import java.util.Map; + +/** + * Whenever the shell calls the executor to execute a SQL statement, an object of ExecutionContext is passed. + */ +public class ExecutionContext { + private Map<String, String> m_configs; + + public ExecutionContext(Map<String, String> config) { + m_configs = config; + } + + /** + * @return The Map storing all configuration pairs. Note that the set map is the same as the one used by + * ExecutionContext, so changes to the map are reflected in ExecutionContext, and vice-versa. + */ + public Map<String, String> getConfigMap() { + return m_configs; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java new file mode 100644 index 0000000..5991922 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.interfaces; + +/** + * An executor shall throw an ExecutionException when it encounters an unrecoverable error. + */ +public class ExecutionException extends RuntimeException { + public ExecutionException() { + } + + public ExecutionException(String message) { + super(message); + } + + public ExecutionException(String message, Throwable cause) { + super(message, cause); + } + + public ExecutionException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java new file mode 100644 index 0000000..77ee888 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.interfaces; + +/** + * Status of the execution of a SQL statement. + */ +public enum ExecutionStatus { + New, + Running, + SuccessfulFinish, + UnsuccessfulFinish +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java new file mode 100644 index 0000000..0173ad1 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.interfaces; + +import java.util.List; + +/** + * Result of a non-query SQL statement or SQL file containing multiple non-query statements. + */ +public class NonQueryResult { + private int execId; // execution ID of the statement(s) submitted + private boolean success; // whether the statement(s) submitted successfully + + // When user submits a batch of SQL statements, only the non-query ones will be submitted + private List<String> submittedStmts; + private List<String> nonSubmittedStmts; + + public NonQueryResult(int execId, boolean success) { + this.execId = execId; + this.success = success; + } + + public NonQueryResult(int execId, boolean success, List<String> submittedStmts, List<String> nonSubmittedStmts) { + this.execId = execId; + this.success = success; + this.submittedStmts = submittedStmts; + this.nonSubmittedStmts = nonSubmittedStmts; + } + + public int getExecutionId() { + return execId; + } + + public boolean succeeded() { + return success; + } + + public List<String> getSubmittedStmts() { + return submittedStmts; + } + + public List<String> getNonSubmittedStmts() { + return nonSubmittedStmts; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java new file mode 100644 index 0000000..6f54557 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.interfaces; + +/** + * Execution result of a SELECT statement. It doesn't contain data though. + */ +public class QueryResult { + private int execId; // execution ID of the statement(s) submitted + private boolean success; // whether the statement(s) submitted successfully + private SqlSchema schema; // The schema of the data coming from the query + + public QueryResult(int execId, SqlSchema schema, Boolean success) { + if (success && schema == null) + throw new IllegalArgumentException(); + this.execId = execId; + this.schema = schema; + this.success = success; + } + + public int getExecutionId() { + return execId; + } + + public SqlSchema getSchema() { + return schema; + } + + public boolean succeeded() { + return success; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java new file mode 100644 index 0000000..9d528f6 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.interfaces; + + +import java.io.File; +import java.util.List; + + +/** + * Conventions: + * <p> + * Implementations shall report UNRECOVERABLE EXCEPTIONS by throwing + * ExecutionExceptions, though SqlExecutor doesn't enforce this by as we don't believe in + * Java checked exceptions. Report errors by returning values as indicated by each + * function and preparing for the subsequent getErrorMsg call. + * <p> + * Each execution (both query and non-query shall return an non-negative execution ID(execId). + * Negative execution IDs are reserved for error handling. + * <p> + * User shall be able to query the status of an execution even after it's finished, so the + * executor shall keep record of all the execution unless being asked to remove them ( + * when removeExecution is called.) + * <p> + * IMPORTANT: An executor shall support two ways of supplying data: + * 1. Say user selects profiles of users who visited LinkedIn in the last 5 mins. There could + * be millions of rows, but the UI only need to show a small fraction. That's retrieveQueryResult, + * accepting a row range (startRow and endRow). Note that UI may ask for the same data over and over, + * like when user switches from page 1 to page 2 and data stream changes at the same time, the two + * pages may actually have overlapped or even same data. + * <p> + * 2. Say user wants to see clicks on a LinkedIn page of certain person from now on. In this mode + * consumeQueryResult shall be used. UI can keep asking for new rows, and once the rows are consumed, + * it's no longer necessary for the executor to keep them. If lots of rows come in, the UI may be only + * interested in the last certain rows (as it's in a logview mode), so all data older can be dropped. + */ +public interface SqlExecutor { + /** + * SqlExecutor shall be ready to accept all other calls after start() is called. + * However, it shall NOT store the ExecutionContext for future use, as each + * call will be given an ExecutionContext which may differ from this one. + * + * @param context The ExecutionContext at the time of the call. + */ + public void start(ExecutionContext context); + + /** + * Indicates no further calls will be made thus it's safe for the executor to clean up. + * + * @param context The ExecutionContext at the time of the call. + */ + public void stop(ExecutionContext context); + + /** + * @param context The ExecutionContext at the time of the call. + * @return null if an error occurs. Prepare for subsequent getErrorMsg call. + * an empty list indicates no tables found. + */ + public List<String> listTables(ExecutionContext context); + + /** + * @param context The ExecutionContext at the time of the call. + * @param tableName Name of the table to get the schema for. + * @return null if an error occurs. Prepare for subsequent getErrorMsg call. + */ + public SqlSchema getTableSchema(ExecutionContext context, String tableName); + + /** + * @param context The ExecutionContext at the time of the call. + * @param statement statement to execute + * @return The query result. + */ + public QueryResult executeQuery(ExecutionContext context, String statement); + + + /** + * @return how many rows available for reading. + */ + public int getRowCount(); + + /** + * Row starts at 0. Executor shall keep the data retrieved. + * For now we get strings for display but we might want strong typed values. + * + * @param context The ExecutionContext at the time of the call. + * @param startRow Start row index (inclusive) + * @param endRow End row index (inclusive) + * @return A list of row data represented by a String array. + */ + public List<String[]> retrieveQueryResult(ExecutionContext context, int startRow, int endRow); + + + /** + * Consumes rows from query result. Executor shall drop them, as "consume" indicates. + * ALL data before endRow (inclusive, including data before startRow) shall be deleted. + * + * @param context The ExecutionContext at the time of the call. + * @param startRow Start row index (inclusive) + * @param endRow End row index (inclusive) + * @return available data between startRow and endRow (both are inclusive) + */ + public List<String[]> consumeQueryResult(ExecutionContext context, int startRow, int endRow); + + /** + * Executes all the NON-QUERY statements in the sqlFile. + * Query statements are ignored as it won't make sense. + * + * @param context The ExecutionContext at the time of the call. + * @param file A File object to read statements from. + * @return Execution result. + */ + public NonQueryResult executeNonQuery(ExecutionContext context, File file); + + /** + * @param context The ExecutionContext at the time of the call. + * @param statements A list of non-query sql statements. + * @return Execution result. + */ + public NonQueryResult executeNonQuery(ExecutionContext context, List<String> statements); + + /** + * @param context The ExecutionContext at the time of the call. + * @param exeId Execution ID. + * @return Whether the operation suceeded or not. + */ + public boolean stopExecution(ExecutionContext context, int exeId); + + + /** + * Removing an ongoing execution shall result in an error. Stop it first. + * + * @param context The ExecutionContext at the time of the call + * @param exeId Execution ID. + * @return Whether the operation succeeded or not. + */ + public boolean removeExecution(ExecutionContext context, int exeId); + + /** + * @param execId Execution ID. + * @return ExecutionStatus. + */ + public ExecutionStatus queryExecutionStatus(int execId); + + /** + * @return The last error message of last function call. + */ + public String getErrorMsg(); + + /** + * @param context The ExecutionContext at the time of the call. + * @return A list of SqlFunction. + */ + List<SqlFunction> listFunctions(ExecutionContext context); +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java new file mode 100644 index 0000000..3ac602c --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.interfaces; + +import java.util.List; + +/** + * Represents a SQL function. + */ +public interface SqlFunction { + /** + * Gets the name of the function. + * @return name of the function + */ + public String getName(); + + /** + * Gets the description of the function. + * @return description of the function. + */ + public String getDescription(); + + /** + * Gets the argument types of the function as a List. + * @return A list containing the type names of the arguments. + */ + public List<String> getArgumentTypes(); + + /** + * Gets the return type of the function. + * @return return type name + */ + public String getReturnType(); + + /** + * Don't forget to implement toString() + */ +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java new file mode 100644 index 0000000..7dcabdb --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.interfaces; + + +import java.util.List; + +/** + * A primitive representation of SQL schema which is just for display purpose. + */ +public class SqlSchema { + private String[] names; // field names + private String[] typeNames; // names of field type + + public SqlSchema(List<String> colNames, List<String> colTypeNames) { + if (colNames == null || colNames.size() == 0 + || colTypeNames == null || colTypeNames.size() == 0 + || colNames.size() != colTypeNames.size()) + throw new IllegalArgumentException(); + + names = new String[colNames.size()]; + names = colNames.toArray(names); + + typeNames = new String[colTypeNames.size()]; + typeNames = colTypeNames.toArray(typeNames); + } + + public int getFieldCount() { + return names.length; + } + + public String getFieldName(int colIdx) { + return names[colIdx]; + } + + public String getFieldTypeName(int colIdx) { + return typeNames[colIdx]; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java new file mode 100644 index 0000000..84e8a0e --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.interfaces; + +import java.util.ArrayList; +import java.util.List; + +/** + * Convenient class for building a SqlSchema object. + */ +public class SqlSchemaBuilder { + private List<String> names = new ArrayList<>(); + private List<String> typeNames = new ArrayList<>(); + + private SqlSchemaBuilder() { + } + + public static SqlSchemaBuilder builder() { + return new SqlSchemaBuilder(); + } + + public SqlSchemaBuilder addField(String name, String fieldType) { + if (name == null || name.isEmpty() || fieldType == null) + throw new IllegalArgumentException(); + + names.add(name); + typeNames.add(fieldType); + return this; + } + + public SqlSchemaBuilder appendFields(List<String> names, List<String> typeNames) { + if (names == null || names.size() == 0 + || typeNames == null || typeNames.size() == 0 + || names.size() != typeNames.size()) + throw new IllegalArgumentException(); + + this.names.addAll(names); + this.typeNames.addAll(typeNames); + + return this; + } + + public SqlSchema toSchema() { + return new SqlSchema(names, typeNames); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java new file mode 100755 index 0000000..743ff44 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.util; + +/** + * The exception used by the shell for unrecoverable errors. + */ +public class CliException extends RuntimeException { + public CliException() { + + } + + public CliException(String message) { + super(message); + } + + public CliException(String message, Throwable cause) { + super(message, cause); + } + + public CliException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java new file mode 100755 index 0000000..03fe1b1 --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.util; + +/** + * Convenient utility class with static methods. + */ +public class CliUtil { + public static boolean isNullOrEmpty(String str) { + return str == null || str.isEmpty(); + } + + public static int ceilingDiv(int x, int y) { + if (x < 0 || y <= 0) + throw new IllegalArgumentException(); + + return x / y + (x % y == 0 ? 0 : 1); + } + + public static StringBuilder appendTo(StringBuilder builder, int toPos, char c) { + for (int i = builder.length(); i <= toPos; ++i) { + builder.append(c); + } + return builder; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java new file mode 100644 index 0000000..789d64c --- /dev/null +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.util; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; + +/** + * A queue that supports random access and consumption. + * @param <T> Element type + */ +public class RandomAccessQueue<T> { + private T[] buffer; + private int capacity; + private int size; + private int head; + + public RandomAccessQueue(Class<T> t, int capacity) { + this.capacity = capacity; + head = 0; + size = 0; + + @SuppressWarnings("unchecked") final T[] b = (T[]) Array.newInstance(t, capacity); + buffer = b; + } + + public synchronized List<T> get(int start, int end) { + int lowerBound = Math.max(start, 0); + int upperBound = Math.min(end, size - 1); + List<T> rets = new ArrayList<>(); + for (int i = lowerBound; i <= upperBound; i++) { + rets.add(buffer[(head + i) % capacity]); + } + return rets; + } + + public synchronized T get(int index) { + if (index >= 0 && index < size) { + return buffer[(head + index) % capacity]; + } + throw new CliException("OutOfBoundaryError"); + } + + public synchronized void add(T t) { + if (size >= capacity) { + buffer[head] = t; + head = (head + 1) % capacity; + } else { + int pos = (head + size) % capacity; + buffer[pos] = t; + size++; + } + } + + /* + * Remove all element before 'end', and return elements between 'start' and 'end' + */ + public synchronized List<T> consume(int start, int end) { + List<T> rets = get(start, end); + int upperBound = Math.min(end, size - 1); + head = (end + 1) % capacity; + size -= (upperBound + 1); + return rets; + } + + public synchronized int getHead() { + return head; + } + + public synchronized int getSize() { + return size; + } + + public synchronized void clear() { + head = 0; + size = 0; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java new file mode 100644 index 0000000..18fe4b7 --- /dev/null +++ b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.impl; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.MapConfig; +import org.apache.samza.sql.client.interfaces.ExecutionContext; +import org.apache.samza.sql.client.interfaces.SqlSchema; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.samza.sql.client.impl.SamzaExecutor.*; +import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*; + + +public class SamzaExecutorTest { + private SamzaExecutor m_executor = new SamzaExecutor(); + + @Test + public void testGetTableSchema() { + ExecutionContext context = getExecutionContext(); + SqlSchema ts = m_executor.getTableSchema(context, "kafka.ProfileChangeStream"); + + Assert.assertEquals("Name", ts.getFieldName(0)); + Assert.assertEquals("NewCompany", ts.getFieldName(1)); + Assert.assertEquals("OldCompany", ts.getFieldName(2)); + Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(3)); + Assert.assertEquals("STRING", ts.getFieldTypeName(0)); + Assert.assertEquals("STRING", ts.getFieldTypeName(1)); + Assert.assertEquals("STRING", ts.getFieldTypeName(2)); + Assert.assertEquals("INT64", ts.getFieldTypeName(3)); + } + + @Test + public void testGenerateResultSchema() { + ExecutionContext context = getExecutionContext(); + Map<String, String> mapConf = fetchSamzaSqlConfig(1, context); + SqlSchema ts = m_executor.generateResultSchema(new MapConfig(mapConf)); + + Assert.assertEquals("__key__", ts.getFieldName(0)); + Assert.assertEquals("Name", ts.getFieldName(1)); + Assert.assertEquals("NewCompany", ts.getFieldName(2)); + Assert.assertEquals("OldCompany", ts.getFieldName(3)); + Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(4)); + Assert.assertEquals("VARCHAR", ts.getFieldTypeName(0)); + Assert.assertEquals("VARCHAR", ts.getFieldTypeName(1)); + Assert.assertEquals("VARCHAR", ts.getFieldTypeName(2)); + Assert.assertEquals("VARCHAR", ts.getFieldTypeName(3)); + Assert.assertEquals("BIGINT", ts.getFieldTypeName(4)); + } + + private ExecutionContext getExecutionContext() { + ClassLoader classLoader = getClass().getClassLoader(); + File file = new File(classLoader.getResource("ProfileChangeStream.avsc").getFile()); + Map<String, String> mapConf = new HashMap<>(); + mapConf.put("samza.sql.relSchemaProvider.config.schemaDir", file.getParent()); + mapConf.put(CFG_SQL_STMT, "insert into log.outputStream select * from kafka.ProfileChangeStream"); + return new ExecutionContext(mapConf); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java new file mode 100644 index 0000000..fe7a19a --- /dev/null +++ b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.client.util; + +import java.util.List; +import org.junit.Assert; +import org.junit.Test; + + +public class RandomAccessQueueTest { + private RandomAccessQueue m_queue; + public RandomAccessQueueTest() { + m_queue = new RandomAccessQueue<>(Integer.class, 5); + } + + @Test + public void testAddAndGetElement() { + m_queue.clear(); + for (int i = 0; i < 4; i++) { + m_queue.add(i); + } + Assert.assertEquals(0, m_queue.getHead()); + Assert.assertEquals(4, m_queue.getSize()); + Assert.assertEquals(0, m_queue.get(0)); + Assert.assertEquals(3, m_queue.get(3)); + + for (int i = 0; i < 3; i++) { + m_queue.add(4 + i); + } + int head = m_queue.getHead(); + Assert.assertEquals(2, head); + Assert.assertEquals(5, m_queue.getSize()); + Assert.assertEquals(2, m_queue.get(0)); + Assert.assertEquals(3, m_queue.get(1)); + Assert.assertEquals(4, m_queue.get(2)); + Assert.assertEquals(5, m_queue.get(3)); + Assert.assertEquals(6, m_queue.get(4)); + } + + @Test + public void testGetRange() { + m_queue.clear(); + for (int i = 0; i < 4; i++) { + m_queue.add(i); // 0, 1, 2, 3 + } + List<Integer> rets = m_queue.get(-1, 9); + Assert.assertEquals(4, rets.size()); + Assert.assertEquals(0, m_queue.get(0)); + Assert.assertEquals(3, m_queue.get(3)); + + for (int i = 0; i <= 2; i++) { + m_queue.add(4 + i); + } + rets = m_queue.get(0, 4); + Assert.assertEquals(2, rets.get(0).intValue()); + Assert.assertEquals(3, rets.get(1).intValue()); + Assert.assertEquals(4, rets.get(2).intValue()); + Assert.assertEquals(5, rets.get(3).intValue()); + Assert.assertEquals(6, rets.get(4).intValue()); + } + + @Test + public void testConsume() { + m_queue.clear(); + for (int i = 0; i < 4; i++) { + m_queue.add(i); // 0, 1, 2, 3 + } + List<Integer> rets = m_queue.consume(1, 2); + Assert.assertEquals(1, m_queue.getSize()); + Assert.assertEquals(3, m_queue.getHead()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc b/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc new file mode 100644 index 0000000..5c1e49d --- /dev/null +++ b/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc @@ -0,0 +1,51 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +{ + "name": "ProfileChangeEvent", + "version" : 1, + "namespace": "com.linkedin.samza.tools.avro", + "type": "record", + "fields": [ + { + "name": "Name", + "doc": "Name of the profile.", + "type": ["null", "string"], + "default":null + }, + { + "name": "NewCompany", + "doc": "Name of the new company the person joined.", + "type": ["null", "string"], + "default":null + }, + { + "name": "OldCompany", + "doc": "Name of the old company the person was working.", + "type": ["null", "string"], + "default":null + }, + { + "name": "ProfileChangeTimestamp", + "doc": "Time at which the profile was changed.", + "type": ["null", "long"], + "default":null + } + ] +} http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index 17df373..745c934 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -143,7 +143,7 @@ public class SamzaSqlApplicationConfig { }); } - private static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig, + public static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig, String pluginDomainFormat, BiFunction<Object, Config, T> factoryInvoker) { String pluginDomain = String.format(pluginDomainFormat, plugin); Config pluginConfig = staticConfig.subset(pluginDomain); http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 853a7bb..5b9df5c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -27,6 +27,7 @@ include \ 'samza-rest', 'samza-shell', 'samza-sql', + 'samza-sql-shell', 'samza-tools' def scalaModules = [
