http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/src/java/org/apache/hive/streaming/package.html ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/package.html b/streaming/src/java/org/apache/hive/streaming/package.html new file mode 100644 index 0000000..2b45792 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/package.html @@ -0,0 +1,181 @@ +<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" + "http://www.w3.org/TR/html4/loose.dtd"> + +<html lang="en"> + +<head> +<meta name=Title content="HCatalog Streaming API"> +<meta name=Keywords content="HCatalog Streaming ACID"> +<meta http-equiv=Content-Type content="text/html; charset=macintosh"> +<title>HCatalog Streaming API</title> +</head> + +<body> + +<h1>HCatalog Streaming API -- high level description</h1> + +<b>NOTE: The Streaming API feature is provided as a technology +preview. The API may undergo incompatible changes in upcoming +releases.</b> + +<p> +Traditionally adding new data into hive requires gathering a large +amount of data onto HDFS and then periodically adding a new +partition. This is essentially a <i>batch insertion</i>. Insertion of +new data into an existing partition or table is not done in a way that +gives consistent results to readers. Hive Streaming API allows data to +be pumped continuously into Hive. The incoming data can be +continuously committed in small batches (of records) into a Hive +partition. Once data is committed it becomes immediately visible to +all Hive queries initiated subsequently.</p> + +<p> +This API is intended for streaming clients such as NiFi, Flume and Storm, +which continuously generate data. Streaming support is built on top of +ACID based insert/update support in Hive.</p> + +<p> +The classes and interfaces part of the Hive streaming API are broadly +categorized into two. The first set provides support for connection +and transaction management while the second set provides I/O +support. Transactions are managed by the Hive MetaStore. Writes are +performed to HDFS via Hive wrapper APIs that bypass MetaStore. </p> + +<p> +<b>Note on packaging</b>: The APIs are defined in the +<b>org.apache.hive.streaming</b> Java package and included as +the hive-streaming jar.</p> + +<h2>STREAMING REQUIREMENTS</h2> + +<p> +A few things are currently required to use streaming. +</p> + +<p> +<ol> + <li> Currently, only ORC storage format is supported. So + '<b>stored as orc</b>' must be specified during table creation.</li> + <li> The hive table may be bucketed but must not be sorted. </li> + <li> User of the client streaming process must have the necessary + permissions to write to the table or partition and create partitions in + the table.</li> + <li> Currently, when issuing queries on streaming tables, query client must set + <ol> + <li><b>hive.input.format = + org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li> + </ol></li> + The above client settings are a temporary requirement and the intention is to + drop the need for them in the near future. + <li> Settings required in hive-site.xml for Metastore: + <ol> + <li><b>hive.txn.manager = + org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</b></li> + <li><b>hive.support.concurrency = true </b> </li> + <li><b>hive.compactor.initiator.on = true</b> </li> + <li><b>hive.compactor.worker.threads > 0 </b> </li> + </ol></li> +</ol></p> + +<p> +<b>Note:</b> Streaming to <b>unpartitioned</b> tables is also +supported.</p> + +<h2>Transaction and Connection management</h2> + +<p> +The class <a href="HiveEndPoint.html"><b>HiveEndPoint</b></a> is a Hive end +point to connect to. An endpoint is either a Hive table or +partition. An endpoint is cheap to create and does not internally hold +on to any network connections. Invoking the newConnection method on +it creates a new connection to the Hive MetaStore for streaming +purposes. It returns a +<a href="StreamingConnection.html"><b>StreamingConnection</b></a> +object. Multiple connections can be established on the same +endpoint. StreamingConnection can then be used to initiate new +transactions for performing I/O. </p> + +<h3>Dynamic Partition Creation:</h3> It is very likely that a setup in +which data is being streamed continuously (e.g. Flume), it is +desirable to have new partitions created automatically (say on a +hourly basis). In such cases requiring the Hive admin to pre-create +the necessary partitions may not be reasonable. Consequently the +streaming API allows streaming clients to create partitions as +needed. <b>HiveEndPoind.newConnection()</b> accepts a argument to +indicate if the partition should be auto created. Partition creation +being an atomic action, multiple clients can race to create the +partition, but only one would succeed, so streaming clients need not +synchronize when creating a partition. The user of the client process +needs to be given write permissions on the Hive table in order to +create partitions. + +<h3>Batching Transactions:</h3> Transactions are implemented slightly +differently than traditional database systems. Multiple transactions +are grouped into a <i>Transaction Batch</i> and each transaction has +an id. Data from each transaction batch gets a single file on HDFS, +which eventually gets compacted with other files into a larger file +automatically for efficiency. + +<h3>Basic Steps:</h3> After connection is established, a streaming +client first requests for a new batch of transactions. In response it +receives a set of transaction ids that are part of the transaction +batch. Subsequently the client proceeds to consume one transaction at +a time by initiating new transactions. Client will write() one or more +records per transactions and either commit or abort the current +transaction before switching to the next one. Each +<b>TransactionBatch.write()</b> invocation automatically associates +the I/O attempt with the current transaction id. The user of the +streaming client needs to have write permissions to the partition or +table.</p> + +<p> +<b>Concurrency Note:</b> I/O can be performed on multiple +<b>TransactionBatch</b>s concurrently. However the transactions within a +transaction batch much be consumed sequentially.</p> + +<h2>Writing Data</h2> + +<p> +These classes and interfaces provide support for writing the data to +Hive within a transaction. +<a href="RecordWriter.html"><b>RecordWriter</b></a> is the interface +implemented by all writers. A writer is responsible for taking a +record in the form of a <b>byte[]</b> containing data in a known +format (e.g. CSV) and writing it out in the format supported by Hive +streaming. A <b>RecordWriter</b> may reorder or drop fields from the incoming +record if necessary to map them to the corresponding columns in the +Hive Table. A streaming client will instantiate an appropriate +<b>RecordWriter</b> type and pass it to +<b>StreamingConnection.fetchTransactionBatch()</b>. The streaming client +does not directly interact with the <b>RecordWriter</b> therafter, but +relies on the <b>TransactionBatch</b> to do so.</p> + +<p> +Currently, out of the box, the streaming API provides two +implementations of the <b>RecordWriter</b> interface. One handles delimited +input data (such as CSV, tab separated, etc. and the other for JSON +(strict syntax). Support for other input formats can be provided by +additional implementations of the <b>RecordWriter</b> interface. +<ul> +<li> <a href="DelimitedInputWriter.html"><b>DelimitedInputWriter</b></a> +- Delimited text input.</li> +<li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a> +- JSON text input.</li> + <li> <a href="StrictRegexWriter.html"><b>StrictRegexWriter</b></a> + - text input with regex.</li> +</ul></p> + +<h2>Performance, Concurrency, Etc.</h2> +<p> + Each StreamingConnection is writing data at the rate the underlying + FileSystem can accept it. If that is not sufficient, multiple StreamingConnection objects can + be created concurrently. +</p> +<p> + Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch + may have at most 2 threads operaing on it. + See <a href="TransactionBatch.html"><b>TransactionBatch</b></a> +</p> +</body> + +</html>
http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java b/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java new file mode 100644 index 0000000..f0843a1 --- /dev/null +++ b/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import java.util.ArrayList; +import java.util.Arrays; + +import org.junit.Test; + +import com.google.common.collect.Lists; + +import junit.framework.Assert; + +public class TestDelimitedInputWriter { + @Test + public void testFieldReordering() throws Exception { + + ArrayList<String> colNames = Lists.newArrayList(new String[]{"col1", "col2", "col3", "col4", "col5"}); + {//1) test dropping fields - first middle & last + String[] fieldNames = {null, "col2", null, "col4", null}; + int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames); + Assert.assertTrue(Arrays.equals(mapping, new int[]{-1, 1, -1, 3, -1})); + } + + {//2) test reordering + String[] fieldNames = {"col5", "col4", "col3", "col2", "col1"}; + int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames); + Assert.assertTrue( Arrays.equals(mapping, new int[]{4,3,2,1,0}) ); + } + + {//3) test bad field names + String[] fieldNames = {"xyz", "abc", "col3", "col4", "as"}; + try { + DelimitedInputWriter.getFieldReordering(fieldNames, colNames); + Assert.fail(); + } catch (InvalidColumn e) { + // should throw + } + } + + {//4) test few field names + String[] fieldNames = {"col3", "col4"}; + int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames); + Assert.assertTrue( Arrays.equals(mapping, new int[]{2,3}) ); + } + + {//5) test extra field names + String[] fieldNames = {"col5", "col4", "col3", "col2", "col1", "col1"}; + try { + DelimitedInputWriter.getFieldReordering(fieldNames, colNames); + Assert.fail(); + } catch (InvalidColumn e) { + //show throw + } + } + } +}
