HIVE-19210: Create separate module for streaming ingest (Prasanth Jayachandran reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/67a8442b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/67a8442b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/67a8442b Branch: refs/heads/branch-3 Commit: 67a8442bda1ecccd31c5dea0c3d6ba23198a742e Parents: 473d1b9 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Mon Apr 16 15:47:53 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Mon Apr 16 15:47:58 2018 -0700 ---------------------------------------------------------------------- .../hive/hcatalog/streaming/package-info.java | 19 + itests/hive-unit/pom.xml | 4 +- .../hive/ql/txn/compactor/TestCompactor.java | 10 +- packaging/pom.xml | 5 + packaging/src/main/assembly/src.xml | 1 + pom.xml | 1 + streaming/pom.xml | 141 ++ .../hive/streaming/AbstractRecordWriter.java | 324 +++ .../apache/hive/streaming/ConnectionError.java | 35 + .../hive/streaming/DelimitedInputWriter.java | 331 +++ .../apache/hive/streaming/HeartBeatFailure.java | 33 + .../org/apache/hive/streaming/HiveEndPoint.java | 1117 +++++++++ .../hive/streaming/ImpersonationFailed.java | 25 + .../apache/hive/streaming/InvalidColumn.java | 26 + .../apache/hive/streaming/InvalidPartition.java | 28 + .../org/apache/hive/streaming/InvalidTable.java | 38 + .../hive/streaming/InvalidTrasactionState.java | 26 + .../hive/streaming/PartitionCreationFailed.java | 25 + .../hive/streaming/QueryFailedException.java | 28 + .../org/apache/hive/streaming/RecordWriter.java | 43 + .../hive/streaming/SerializationError.java | 26 + .../hive/streaming/StreamingConnection.java | 57 + .../hive/streaming/StreamingException.java | 28 + .../hive/streaming/StreamingIOFailure.java | 31 + .../apache/hive/streaming/StrictJsonWriter.java | 162 ++ .../hive/streaming/StrictRegexWriter.java | 189 ++ .../apache/hive/streaming/TransactionBatch.java | 125 + .../streaming/TransactionBatchUnAvailable.java | 25 + .../apache/hive/streaming/TransactionError.java | 29 + .../java/org/apache/hive/streaming/package.html | 181 ++ .../streaming/TestDelimitedInputWriter.java | 73 + .../apache/hive/streaming/TestStreaming.java | 2330 ++++++++++++++++++ 32 files changed, 5509 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java new file mode 100644 index 0000000..36d6b13 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@Deprecated // use org.apache.hive.streaming instead +package org.apache.hive.hcatalog.streaming; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/itests/hive-unit/pom.xml ---------------------------------------------------------------------- diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index f473d25..0357afd 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -76,8 +76,8 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hive-hcatalog-streaming</artifactId> + <groupId>org.apache.hive</groupId> + <artifactId>hive-streaming</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 5966740..b19aa23 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -70,11 +70,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.JobConf; import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.streaming.DelimitedInputWriter; -import org.apache.hive.hcatalog.streaming.HiveEndPoint; -import org.apache.hive.hcatalog.streaming.StreamingConnection; -import org.apache.hive.hcatalog.streaming.StreamingException; -import org.apache.hive.hcatalog.streaming.TransactionBatch; +import org.apache.hive.streaming.DelimitedInputWriter; +import org.apache.hive.streaming.HiveEndPoint; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.hive.streaming.TransactionBatch; import org.apache.orc.OrcConf; import org.junit.After; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/packaging/pom.xml ---------------------------------------------------------------------- diff --git a/packaging/pom.xml b/packaging/pom.xml index 52ad6a2..8b03e2e 100644 --- a/packaging/pom.xml +++ b/packaging/pom.xml @@ -258,6 +258,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-streaming</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog-streaming</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/packaging/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/packaging/src/main/assembly/src.xml b/packaging/src/main/assembly/src.xml index 486fe52..c477194 100644 --- a/packaging/src/main/assembly/src.xml +++ b/packaging/src/main/assembly/src.xml @@ -97,6 +97,7 @@ <include>spark-client/**/*</include> <include>storage-api/**/*</include> <include>standalone-metastore/**/*</include> + <include>streaming/**/*</include> <include>testutils/**/*</include> <include>vector-code-gen/**/*</include> <include>kryo-registrator/**/*</include> http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 76b11ef..190f74c 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ <module>serde</module> <module>service-rpc</module> <module>service</module> + <module>streaming</module> <module>llap-common</module> <module>llap-client</module> <module>llap-ext-client</module> http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/pom.xml ---------------------------------------------------------------------- diff --git a/streaming/pom.xml b/streaming/pom.xml new file mode 100644 index 0000000..6da034a --- /dev/null +++ b/streaming/pom.xml @@ -0,0 +1,141 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hive</groupId> + <artifactId>hive</artifactId> + <version>3.0.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>hive-streaming</artifactId> + <packaging>jar</packaging> + <name>Hive Streaming</name> + + <properties> + <hive.path.to.root>..</hive.path.to.root> + </properties> + + <dependencies> + <!-- dependencies are always listed in sorted order by groupId, artifectId --> + <!-- intra-project --> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-core</artifactId> + <optional>true</optional> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <optional>true</optional> + <version>3.3.2</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <optional>true</optional> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <optional>true</optional> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- test --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <scope>test</scope> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + <build> + <sourceDirectory>${basedir}/src/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/test</testSourceDirectory> + <resources> + </resources> + <plugins> + <!-- plugins are always listed in sorted order by groupId, artifectId --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java new file mode 100644 index 0000000..25998ae --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.thrift.TException; + +import java.io.IOException; + +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; + + +public abstract class AbstractRecordWriter implements RecordWriter { + static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName()); + + private final HiveConf conf; + private final HiveEndPoint endPoint; + final Table tbl; + + private final IMetaStoreClient msClient; + final List<Integer> bucketIds; + private ArrayList<RecordUpdater> updaters = null; + + private final int totalBuckets; + /** + * Indicates whether target table is bucketed + */ + private final boolean isBucketed; + + private final Path partitionPath; + + private final AcidOutputFormat<?,?> outf; + private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write. + private Long curBatchMinWriteId; + private Long curBatchMaxWriteId; + + private static final class TableWriterPair { + private final Table tbl; + private final Path partitionPath; + TableWriterPair(Table t, Path p) { + tbl = t; + partitionPath = p; + } + } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #AbstractRecordWriter(HiveEndPoint, HiveConf, StreamingConnection)} + */ + protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) + throws ConnectionError, StreamingException { + this(endPoint, conf, null); + } + protected AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, StreamingConnection conn) + throws StreamingException { + this.endPoint = endPoint2; + this.conf = conf!=null ? conf + : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri); + try { + msClient = HCatUtil.getHiveMetastoreClient(this.conf); + UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() : null; + if (ugi == null) { + this.tbl = msClient.getTable(endPoint.database, endPoint.table); + this.partitionPath = getPathForEndPoint(msClient, endPoint); + } else { + TableWriterPair twp = ugi.doAs( + new PrivilegedExceptionAction<TableWriterPair>() { + @Override + public TableWriterPair run() throws Exception { + return new TableWriterPair(msClient.getTable(endPoint.database, endPoint.table), + getPathForEndPoint(msClient, endPoint)); + } + }); + this.tbl = twp.tbl; + this.partitionPath = twp.partitionPath; + } + this.isBucketed = tbl.getSd().getNumBuckets() > 0; + /** + * For unbucketed tables we have exactly 1 RecrodUpdater for each AbstractRecordWriter which + * ends up writing to a file bucket_000000 + * See also {@link #getBucket(Object)} + */ + this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1; + if(isBucketed) { + this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()); + this.bucketFieldData = new Object[bucketIds.size()]; + } + else { + bucketIds = Collections.emptyList(); + } + String outFormatName = this.tbl.getSd().getOutputFormat(); + outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf); + } catch(InterruptedException e) { + throw new StreamingException(endPoint2.toString(), e); + } catch (MetaException | NoSuchObjectException e) { + throw new ConnectionError(endPoint2, e); + } catch (TException | ClassNotFoundException | IOException e) { + throw new StreamingException(e.getMessage(), e); + } + } + + /** + * used to tag error msgs to provied some breadcrumbs + */ + String getWatermark() { + return partitionPath + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]"; + } + // return the column numbers of the bucketed columns + private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) { + ArrayList<Integer> result = new ArrayList<Integer>(bucketCols.size()); + HashSet<String> bucketSet = new HashSet<String>(bucketCols); + for (int i = 0; i < cols.size(); i++) { + if( bucketSet.contains(cols.get(i).getName()) ) { + result.add(i); + } + } + return result; + } + + /** + * Get the SerDe for the Objects created by {@link #encode}. This is public so that test + * frameworks can use it. + * @return serde + * @throws SerializationError + */ + public abstract AbstractSerDe getSerde() throws SerializationError; + + /** + * Encode a record as an Object that Hive can read with the ObjectInspector associated with the + * serde returned by {@link #getSerde}. This is public so that test frameworks can use it. + * @param record record to be deserialized + * @return deserialized record as an Object + * @throws SerializationError + */ + public abstract Object encode(byte[] record) throws SerializationError; + + protected abstract ObjectInspector[] getBucketObjectInspectors(); + protected abstract StructObjectInspector getRecordObjectInspector(); + protected abstract StructField[] getBucketStructFields(); + + // returns the bucket number to which the record belongs to + protected int getBucket(Object row) throws SerializationError { + if(!isBucketed) { + return 0; + } + ObjectInspector[] inspectors = getBucketObjectInspectors(); + Object[] bucketFields = getBucketFields(row); + return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets); + } + + @Override + public void flush() throws StreamingIOFailure { + try { + for (RecordUpdater updater : updaters) { + if (updater != null) { + updater.flush(); + } + } + } catch (IOException e) { + throw new StreamingIOFailure("Unable to flush recordUpdater", e); + } + } + + @Override + public void clear() throws StreamingIOFailure { + } + + /** + * Creates a new record updater for the new batch + * @param minWriteId smallest writeid in the batch + * @param maxWriteID largest writeid in the batch + * @throws StreamingIOFailure if failed to create record updater + */ + @Override + public void newBatch(Long minWriteId, Long maxWriteID) + throws StreamingIOFailure, SerializationError { + curBatchMinWriteId = minWriteId; + curBatchMaxWriteId = maxWriteID; + updaters = new ArrayList<RecordUpdater>(totalBuckets); + for (int bucket = 0; bucket < totalBuckets; bucket++) { + updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds + } + } + + @Override + public void closeBatch() throws StreamingIOFailure { + boolean haveError = false; + for (RecordUpdater updater : updaters) { + if (updater != null) { + try { + //try not to leave any files open + updater.close(false); + } catch (Exception ex) { + haveError = true; + LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex); + } + } + } + updaters.clear(); + if(haveError) { + throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark()); + } + } + + protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds + , StructObjectInspector recordObjInspector) + throws SerializationError { + ObjectInspector[] result = new ObjectInspector[bucketIds.size()]; + + for (int i = 0; i < bucketIds.size(); i++) { + int bucketId = bucketIds.get(i); + result[i] = + recordObjInspector.getAllStructFieldRefs().get( bucketId ).getFieldObjectInspector(); + } + return result; + } + + + private Object[] getBucketFields(Object row) throws SerializationError { + StructObjectInspector recordObjInspector = getRecordObjectInspector(); + StructField[] bucketStructFields = getBucketStructFields(); + for (int i = 0; i < bucketIds.size(); i++) { + bucketFieldData[i] = recordObjInspector.getStructFieldData(row, bucketStructFields[i]); + } + return bucketFieldData; + } + + private RecordUpdater createRecordUpdater(int bucketId, Long minWriteId, Long maxWriteID) + throws IOException, SerializationError { + try { + // Initialize table properties from the table parameters. This is required because the table + // may define certain table parameters that may be required while writing. The table parameter + // 'transactional_properties' is one such example. + Properties tblProperties = new Properties(); + tblProperties.putAll(tbl.getParameters()); + return outf.getRecordUpdater(partitionPath, + new AcidOutputFormat.Options(conf) + .inspector(getSerde().getObjectInspector()) + .bucket(bucketId) + .tableProperties(tblProperties) + .minimumWriteId(minWriteId) + .maximumWriteId(maxWriteID) + .statementId(-1) + .finalDestination(partitionPath)); + } catch (SerDeException e) { + throw new SerializationError("Failed to get object inspector from Serde " + + getSerde().getClass().getName(), e); + } + } + + RecordUpdater getRecordUpdater(int bucketId) throws StreamingIOFailure, SerializationError { + RecordUpdater recordUpdater = updaters.get(bucketId); + if (recordUpdater == null) { + try { + recordUpdater = createRecordUpdater(bucketId, curBatchMinWriteId, curBatchMaxWriteId); + } catch (IOException e) { + String errMsg = "Failed creating RecordUpdater for " + getWatermark(); + LOG.error(errMsg, e); + throw new StreamingIOFailure(errMsg, e); + } + updaters.set(bucketId, recordUpdater); + } + return recordUpdater; + } + + private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint endPoint) + throws StreamingException { + try { + String location; + if(endPoint.partitionVals==null || endPoint.partitionVals.isEmpty() ) { + location = msClient.getTable(endPoint.database,endPoint.table) + .getSd().getLocation(); + } else { + location = msClient.getPartition(endPoint.database, endPoint.table, + endPoint.partitionVals).getSd().getLocation(); + } + return new Path(location); + } catch (TException e) { + throw new StreamingException(e.getMessage() + + ". Unable to get path for end point: " + + endPoint.partitionVals, e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionError.java b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java new file mode 100644 index 0000000..668bffb --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class ConnectionError extends StreamingException { + + public ConnectionError(String msg) { + super(msg); + } + + public ConnectionError(String msg, Exception innerEx) { + super(msg, innerEx); + } + + public ConnectionError(HiveEndPoint endPoint, Exception innerEx) { + super("Error connecting to " + endPoint + + (innerEx == null ? "" : ": " + innerEx.getMessage()), innerEx); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java new file mode 100644 index 0000000..898b3f9 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.BytesWritable; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Streaming Writer handles delimited input (eg. CSV). + * Delimited input is parsed & reordered to match column order in table + * Uses Lazy Simple Serde to process delimited input + */ +public class DelimitedInputWriter extends AbstractRecordWriter { + private final boolean reorderingNeeded; + private String delimiter; + private char serdeSeparator; + private int[] fieldToColMapping; + private final ArrayList<String> tableColumns; + private LazySimpleSerDe serde = null; + + private final LazySimpleStructObjectInspector recordObjInspector; + private final ObjectInspector[] bucketObjInspectors; + private final StructField[] bucketStructFields; + + static final private Logger LOG = LoggerFactory.getLogger(DelimitedInputWriter.class.getName()); + + /** Constructor. Uses default separator of the LazySimpleSerde + * @param colNamesForFields Column name assignment for input fields. nulls or empty + * strings in the array indicates the fields to be skipped + * @param delimiter input field delimiter + * @param endPoint Hive endpoint + * @throws ConnectionError Problem talking to Hive + * @throws ClassNotFoundException Serde class not found + * @throws SerializationError Serde initialization/interaction failed + * @throws StreamingException Problem acquiring file system path for partition + * @throws InvalidColumn any element in colNamesForFields refers to a non existing column + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, StreamingConnection conn) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, null, conn); + } + /** Constructor. Uses default separator of the LazySimpleSerde + * @param colNamesForFields Column name assignment for input fields. nulls or empty + * strings in the array indicates the fields to be skipped + * @param delimiter input field delimiter + * @param endPoint Hive endpoint + * @param conf a Hive conf object. Can be null if not using advanced hive settings. + * @throws ConnectionError Problem talking to Hive + * @throws ClassNotFoundException Serde class not found + * @throws SerializationError Serde initialization/interaction failed + * @throws StreamingException Problem acquiring file system path for partition + * @throws InvalidColumn any element in colNamesForFields refers to a non existing column + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, conf, + (char) LazySerDeParameters.DefaultSeparators[0], conn); + } + /** + * Constructor. Allows overriding separator of the LazySimpleSerde + * @param colNamesForFields Column name assignment for input fields + * @param delimiter input field delimiter + * @param endPoint Hive endpoint + * @param conf a Hive conf object. Set to null if not using advanced hive settings. + * @param serdeSeparator separator used when encoding data that is fed into the + * LazySimpleSerde. Ensure this separator does not occur + * in the field data + * @param conn connection this Writer is to be used with + * @throws ConnectionError Problem talking to Hive + * @throws ClassNotFoundException Serde class not found + * @throws SerializationError Serde initialization/interaction failed + * @throws StreamingException Problem acquiring file system path for partition + * @throws InvalidColumn any element in colNamesForFields refers to a non existing column + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf, char serdeSeparator, StreamingConnection conn) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + super(endPoint, conf, conn); + this.tableColumns = getCols(tbl); + this.serdeSeparator = serdeSeparator; + this.delimiter = delimiter; + this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns()); + this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns()); + LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint); + this.serdeSeparator = serdeSeparator; + this.serde = createSerde(tbl, conf, serdeSeparator); + + // get ObjInspectors for entire record and bucketed cols + try { + this.recordObjInspector = (LazySimpleStructObjectInspector) serde.getObjectInspector(); + this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector); + } catch (SerDeException e) { + throw new SerializationError("Unable to get ObjectInspector for bucket columns", e); + } + + // get StructFields for bucketed cols + bucketStructFields = new StructField[bucketIds.size()]; + List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs(); + for (int i = 0; i < bucketIds.size(); i++) { + bucketStructFields[i] = allFields.get(bucketIds.get(i)); + } + } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)} + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, null, null); + } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, StreamingConnection)} + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, conf, + (char) LazySerDeParameters.DefaultSeparators[0], null); + } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, StreamingConnection)} + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf, char serdeSeparator) + throws ClassNotFoundException, StreamingException { + this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null); + } + + private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) { + return !( delimiter.equals(String.valueOf(getSerdeSeparator())) + && areFieldsInColOrder(fieldToColMapping) + && tableColumns.size()>=fieldToColMapping.length ); + } + + private static boolean areFieldsInColOrder(int[] fieldToColMapping) { + for(int i=0; i<fieldToColMapping.length; ++i) { + if(fieldToColMapping[i]!=i) { + return false; + } + } + return true; + } + + @VisibleForTesting + static int[] getFieldReordering(String[] colNamesForFields, List<String> tableColNames) + throws InvalidColumn { + int[] result = new int[ colNamesForFields.length ]; + for(int i=0; i<colNamesForFields.length; ++i) { + result[i] = -1; + } + int i=-1, fieldLabelCount=0; + for( String col : colNamesForFields ) { + ++i; + if(col == null) { + continue; + } + if( col.trim().isEmpty() ) { + continue; + } + ++fieldLabelCount; + int loc = tableColNames.indexOf(col); + if(loc == -1) { + throw new InvalidColumn("Column '" + col + "' not found in table for input field " + i+1); + } + result[i] = loc; + } + if(fieldLabelCount>tableColNames.size()) { + throw new InvalidColumn("Number of field names exceeds the number of columns in table"); + } + return result; + } + + // Reorder fields in record based on the order of columns in the table + protected byte[] reorderFields(byte[] record) throws UnsupportedEncodingException { + if(!reorderingNeeded) { + return record; + } + String[] reorderedFields = new String[getTableColumns().size()]; + String decoded = new String(record); + String[] fields = decoded.split(delimiter,-1); + for (int i=0; i<fieldToColMapping.length; ++i) { + int newIndex = fieldToColMapping[i]; + if(newIndex != -1) { + reorderedFields[newIndex] = fields[i]; + } + } + return join(reorderedFields, getSerdeSeparator()); + } + + // handles nulls in items[] + // TODO: perhaps can be made more efficient by creating a byte[] directly + private static byte[] join(String[] items, char separator) { + StringBuilder buff = new StringBuilder(100); + if(items.length == 0) + return "".getBytes(); + int i=0; + for(; i<items.length-1; ++i) { + if(items[i]!=null) { + buff.append(items[i]); + } + buff.append(separator); + } + if(items[i]!=null) { + buff.append(items[i]); + } + return buff.toString().getBytes(); + } + + protected ArrayList<String> getTableColumns() { + return tableColumns; + } + + @Override + public void write(long writeId, byte[] record) + throws SerializationError, StreamingIOFailure { + try { + byte[] orderedFields = reorderFields(record); + Object encodedRow = encode(orderedFields); + int bucket = getBucket(encodedRow); + getRecordUpdater(bucket).insert(writeId, encodedRow); + } catch (IOException e) { + throw new StreamingIOFailure("Error writing record in transaction write id (" + + writeId + ")", e); + } + } + + @Override + public AbstractSerDe getSerde() { + return serde; + } + + protected LazySimpleStructObjectInspector getRecordObjectInspector() { + return recordObjInspector; + } + + @Override + protected StructField[] getBucketStructFields() { + return bucketStructFields; + } + + protected ObjectInspector[] getBucketObjectInspectors() { + return bucketObjInspectors; + } + + @Override + public Object encode(byte[] record) throws SerializationError { + try { + BytesWritable blob = new BytesWritable(); + blob.set(record, 0, record.length); + return serde.deserialize(blob); + } catch (SerDeException e) { + throw new SerializationError("Unable to convert byte[] record into Object", e); + } + } + + /** + * Creates LazySimpleSerde + * @return + * @throws SerializationError if serde could not be initialized + * @param tbl + */ + protected static LazySimpleSerDe createSerde(Table tbl, HiveConf conf, char serdeSeparator) + throws SerializationError { + try { + Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); + tableProps.setProperty("field.delim", String.valueOf(serdeSeparator)); + LazySimpleSerDe serde = new LazySimpleSerDe(); + SerDeUtils.initializeSerDe(serde, conf, tableProps, null); + return serde; + } catch (SerDeException e) { + throw new SerializationError("Error initializing serde", e); + } + } + + private ArrayList<String> getCols(Table table) { + List<FieldSchema> cols = table.getSd().getCols(); + ArrayList<String> colNames = new ArrayList<String>(cols.size()); + for (FieldSchema col : cols) { + colNames.add(col.getName().toLowerCase()); + } + return colNames; + } + + public char getSerdeSeparator() { + return serdeSeparator; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java new file mode 100644 index 0000000..b1f9520 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import java.util.Collection; +import java.util.Set; + +public class HeartBeatFailure extends StreamingException { + private Collection<Long> abortedTxns; + private Collection<Long> nosuchTxns; + + public HeartBeatFailure(Collection<Long> abortedTxns, Set<Long> nosuchTxns) { + super("Heart beat error. InvalidTxns: " + nosuchTxns + ". AbortedTxns: " + abortedTxns); + this.abortedTxns = abortedTxns; + this.nosuchTxns = nosuchTxns; + } +}