Fixing stylecheck problems with storm-hive
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5fc4e9f0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5fc4e9f0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5fc4e9f0 Branch: refs/heads/master Commit: 5fc4e9f0bdf7a58852f7c27a1f8049e2bb3776a5 Parents: 0e409ec Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 23:19:45 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 02:32:40 2018 -0400 ---------------------------------------------------------------------- external/storm-hive/pom.xml | 2 +- .../org/apache/storm/hive/bolt/HiveBolt.java | 139 ++++----- .../bolt/mapper/DelimitedRecordHiveMapper.java | 60 ++-- .../storm/hive/bolt/mapper/HiveMapper.java | 30 +- .../hive/bolt/mapper/JsonRecordHiveMapper.java | 61 ++-- .../apache/storm/hive/common/HiveOptions.java | 24 +- .../org/apache/storm/hive/common/HiveUtils.java | 60 ++-- .../apache/storm/hive/common/HiveWriter.java | 255 ++++++++-------- .../apache/storm/hive/trident/HiveState.java | 124 ++++---- .../storm/hive/trident/HiveStateFactory.java | 31 +- .../apache/storm/hive/trident/HiveUpdater.java | 23 +- .../apache/storm/hive/bolt/HiveSetupUtil.java | 143 +++++---- .../apache/storm/hive/bolt/TestHiveBolt.java | 302 +++++++++---------- .../storm/hive/common/TestHiveWriter.java | 205 ++++++------- 14 files changed, 683 insertions(+), 776 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml index 4637e7f..eb790f7 100644 --- a/external/storm-hive/pom.xml +++ b/external/storm-hive/pom.xml @@ -202,7 +202,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>259</maxAllowedViolations> + <maxAllowedViolations>58</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java index 7a76888..cfabbd6 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java @@ -1,59 +1,52 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hive.bolt; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.SerializationError; import org.apache.hive.hcatalog.streaming.StreamingException; +import org.apache.storm.Config; +import org.apache.storm.hive.common.HiveOptions; +import org.apache.storm.hive.common.HiveUtils; +import org.apache.storm.hive.common.HiveWriter; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.BatchHelper; import org.apache.storm.utils.TupleUtils; -import org.apache.storm.Config; -import org.apache.storm.hive.common.HiveWriter; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.storm.hive.common.HiveOptions; -import org.apache.storm.hive.common.HiveUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.Timer; -import java.util.TimerTask; -import java.util.Map.Entry; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.List; -import java.io.IOException; - public class HiveBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class); + @VisibleForTesting + Map<HiveEndPoint, HiveWriter> allWriters; private OutputCollector collector; private HiveOptions options; private ExecutorService callTimeoutPool; @@ -63,36 +56,33 @@ public class HiveBolt extends BaseRichBolt { private BatchHelper batchHelper; private boolean tokenAuthEnabled; - @VisibleForTesting - Map<HiveEndPoint, HiveWriter> allWriters; - public HiveBolt(HiveOptions options) { this.options = options; } @Override - public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) { + public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) { try { tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf); try { ugi = HiveUtils.authenticate(tokenAuthEnabled, options.getKerberosKeytab(), options.getKerberosPrincipal()); - } catch(HiveUtils.AuthenticationFailed ex) { + } catch (HiveUtils.AuthenticationFailed ex) { LOG.error("Hive kerberos authentication failed " + ex.getMessage(), ex); throw new IllegalArgumentException(ex); } this.collector = collector; this.batchHelper = new BatchHelper(options.getBatchSize(), collector); - allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>(); + allWriters = new ConcurrentHashMap<HiveEndPoint, HiveWriter>(); String timeoutName = "hive-bolt-%d"; this.callTimeoutPool = Executors.newFixedThreadPool(1, - new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); + new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); sendHeartBeat.set(true); heartBeatTimer = new Timer(); setupHeartBeatTimer(); - } catch(Exception e) { + } catch (Exception e) { LOG.warn("unable to make connection to hive ", e); } } @@ -108,7 +98,7 @@ public class HiveBolt extends BaseRichBolt { batchHelper.addBatch(tuple); } - if(batchHelper.shouldFlush()) { + if (batchHelper.shouldFlush()) { flushAllWriters(true); LOG.info("acknowledging tuples after writers flushed "); batchHelper.ack(); @@ -116,11 +106,11 @@ public class HiveBolt extends BaseRichBolt { if (TupleUtils.isTick(tuple)) { retireIdleWriters(); } - } catch(SerializationError se) { + } catch (SerializationError se) { LOG.info("Serialization exception occurred, tuple is acknowledged but not written to Hive.", tuple); this.collector.reportError(se); collector.ack(tuple); - } catch(Exception e) { + } catch (Exception e) { batchHelper.fail(e); abortAndCloseWriters(); } @@ -147,13 +137,13 @@ public class HiveBolt extends BaseRichBolt { } } - ExecutorService toShutdown[] = {callTimeoutPool}; + ExecutorService toShutdown[] = { callTimeoutPool }; for (ExecutorService execService : toShutdown) { execService.shutdown(); try { while (!execService.isTerminated()) { execService.awaitTermination( - options.getCallTimeOut(), TimeUnit.MILLISECONDS); + options.getCallTimeOut(), TimeUnit.MILLISECONDS); } } catch (InterruptedException ex) { LOG.warn("shutdown interrupted on " + execService, ex); @@ -168,31 +158,33 @@ public class HiveBolt extends BaseRichBolt { @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = super.getComponentConfiguration(); - if (conf == null) + if (conf == null) { conf = new Config(); + } - if (options.getTickTupleInterval() > 0) + if (options.getTickTupleInterval() > 0) { conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, options.getTickTupleInterval()); + } return conf; } private void setupHeartBeatTimer() { - if(options.getHeartBeatInterval()>0) { + if (options.getHeartBeatInterval() > 0) { heartBeatTimer.schedule(new TimerTask() { - @Override - public void run() { - try { - if (sendHeartBeat.get()) { - LOG.debug("Start sending heartbeat on all writers"); - sendHeartBeatOnAllWriters(); - setupHeartBeatTimer(); - } - } catch (Exception e) { - LOG.warn("Failed to heartbeat on HiveWriter ", e); + @Override + public void run() { + try { + if (sendHeartBeat.get()) { + LOG.debug("Start sending heartbeat on all writers"); + sendHeartBeatOnAllWriters(); + setupHeartBeatTimer(); } + } catch (Exception e) { + LOG.warn("Failed to heartbeat on HiveWriter ", e); } - }, options.getHeartBeatInterval() * 1000); + } + }, options.getHeartBeatInterval() * 1000); } } @@ -204,7 +196,7 @@ public class HiveBolt extends BaseRichBolt { void flushAllWriters(boolean rollToNext) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException { - for(HiveWriter writer: allWriters.values()) { + for (HiveWriter writer : allWriters.values()) { writer.flush(rollToNext); } } @@ -213,7 +205,7 @@ public class HiveBolt extends BaseRichBolt { try { abortAllWriters(); closeAllWriters(); - } catch(Exception ie) { + } catch (Exception ie) { LOG.warn("unable to close hive connections. ", ie); } } @@ -222,11 +214,11 @@ public class HiveBolt extends BaseRichBolt { * Abort current Txn on all writers */ private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure { - for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { try { entry.getValue().abort(); } catch (Exception e) { - LOG.error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() +" due to exception ", e); + LOG.error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() + " due to exception ", e); } } } @@ -236,10 +228,10 @@ public class HiveBolt extends BaseRichBolt { */ private void closeAllWriters() { //1) Retire writers - for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { try { entry.getValue().close(); - } catch(Exception e) { + } catch (Exception e) { LOG.warn("unable to close writers. ", e); } } @@ -251,14 +243,15 @@ public class HiveBolt extends BaseRichBolt { HiveWriter getOrCreateWriter(HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException { try { - HiveWriter writer = allWriters.get( endPoint ); + HiveWriter writer = allWriters.get(endPoint); if (writer == null) { LOG.debug("Creating Writer to Hive end point : " + endPoint); writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options, tokenAuthEnabled); if (allWriters.size() > (options.getMaxOpenConnections() - 1)) { - LOG.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", allWriters.size(), options.getMaxOpenConnections()); + LOG.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", allWriters.size(), + options.getMaxOpenConnections()); int retired = retireIdleWriters(); - if(retired==0) { + if (retired == 0) { retireEldestWriter(); } } @@ -279,7 +272,7 @@ public class HiveBolt extends BaseRichBolt { LOG.info("Attempting close eldest writers"); long oldestTimeStamp = System.currentTimeMillis(); HiveEndPoint eldest = null; - for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { if (entry.getValue().getLastUsed() < oldestTimeStamp) { eldest = entry.getKey(); oldestTimeStamp = entry.getValue().getLastUsed(); @@ -308,8 +301,8 @@ public class HiveBolt extends BaseRichBolt { long now = System.currentTimeMillis(); //1) Find retirement candidates - for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { - if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) { + for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { + if (now - entry.getValue().getLastUsed() > options.getIdleTimeout()) { ++count; retire(entry.getKey()); } http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java index 48cd5ff..d67ec67 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java @@ -1,40 +1,34 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hive.bolt.mapper; import com.google.common.annotations.VisibleForTesting; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.trident.tuple.TridentTuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; import org.apache.hive.hcatalog.streaming.DelimitedInputWriter; import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.RecordWriter; import org.apache.hive.hcatalog.streaming.StreamingException; import org.apache.hive.hcatalog.streaming.TransactionBatch; - -import java.util.List; -import java.util.ArrayList; -import java.util.Date; -import java.text.SimpleDateFormat; -import java.io.IOException; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DelimitedRecordHiveMapper implements HiveMapper { private static final Logger LOG = LoggerFactory.getLogger(DelimitedRecordHiveMapper.class); @@ -62,7 +56,7 @@ public class DelimitedRecordHiveMapper implements HiveMapper { return this; } - public DelimitedRecordHiveMapper withFieldDelimiter(String delimiter){ + public DelimitedRecordHiveMapper withFieldDelimiter(String delimiter) { this.fieldDelimiter = delimiter; return this; } @@ -76,7 +70,7 @@ public class DelimitedRecordHiveMapper implements HiveMapper { @Override public RecordWriter createRecordWriter(HiveEndPoint endPoint) throws StreamingException, IOException, ClassNotFoundException { - return new DelimitedInputWriter(columnNames, fieldDelimiter,endPoint); + return new DelimitedInputWriter(columnNames, fieldDelimiter, endPoint); } @Override @@ -88,8 +82,8 @@ public class DelimitedRecordHiveMapper implements HiveMapper { @Override public List<String> mapPartitions(Tuple tuple) { List<String> partitionList = new ArrayList<String>(); - if(this.partitionFields != null) { - for(String field: this.partitionFields) { + if (this.partitionFields != null) { + for (String field : this.partitionFields) { partitionList.add(tuple.getStringByField(field)); } } @@ -102,8 +96,8 @@ public class DelimitedRecordHiveMapper implements HiveMapper { @Override public byte[] mapRecord(Tuple tuple) { StringBuilder builder = new StringBuilder(); - if(this.columnFields != null) { - for(String field: this.columnFields) { + if (this.columnFields != null) { + for (String field : this.columnFields) { builder.append(tuple.getValueByField(field)); builder.append(fieldDelimiter); } @@ -114,8 +108,8 @@ public class DelimitedRecordHiveMapper implements HiveMapper { @Override public List<String> mapPartitions(TridentTuple tuple) { List<String> partitionList = new ArrayList<String>(); - if(this.partitionFields != null) { - for(String field: this.partitionFields) { + if (this.partitionFields != null) { + for (String field : this.partitionFields) { partitionList.add(tuple.getStringByField(field)); } } @@ -128,8 +122,8 @@ public class DelimitedRecordHiveMapper implements HiveMapper { @Override public byte[] mapRecord(TridentTuple tuple) { StringBuilder builder = new StringBuilder(); - if(this.columnFields != null) { - for(String field: this.columnFields) { + if (this.columnFields != null) { + for (String field : this.columnFields) { builder.append(tuple.getValueByField(field)); builder.append(fieldDelimiter); } http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java index a7a79fb..cc29392 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java @@ -1,33 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hive.bolt.mapper; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.trident.tuple.TridentTuple; +import java.io.IOException; +import java.io.Serializable; import java.util.List; import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.RecordWriter; -import org.apache.hive.hcatalog.streaming.TransactionBatch; import org.apache.hive.hcatalog.streaming.StreamingException; -import java.io.Serializable; - -import java.io.IOException; +import org.apache.hive.hcatalog.streaming.TransactionBatch; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Tuple; /** * Maps a <code>org.apache.storm.tuple.Tupe</code> object http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java index 0028b87..3a43b7c 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java @@ -1,41 +1,34 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hive.bolt.mapper; +package org.apache.storm.hive.bolt.mapper; -import org.apache.storm.tuple.Fields; -import org.apache.storm.trident.tuple.TridentTuple; -import org.apache.storm.tuple.Tuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.RecordWriter; import org.apache.hive.hcatalog.streaming.StreamingException; import org.apache.hive.hcatalog.streaming.StrictJsonWriter; import org.apache.hive.hcatalog.streaming.TransactionBatch; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; - -import java.util.List; -import java.util.ArrayList; -import java.util.Date; -import java.text.SimpleDateFormat; -import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JsonRecordHiveMapper implements HiveMapper { private static final Logger LOG = LoggerFactory.getLogger(JsonRecordHiveMapper.class); @@ -78,8 +71,8 @@ public class JsonRecordHiveMapper implements HiveMapper { @Override public List<String> mapPartitions(Tuple tuple) { List<String> partitionList = new ArrayList<String>(); - if(this.partitionFields != null) { - for(String field: this.partitionFields) { + if (this.partitionFields != null) { + for (String field : this.partitionFields) { partitionList.add(tuple.getStringByField(field)); } } @@ -92,9 +85,9 @@ public class JsonRecordHiveMapper implements HiveMapper { @Override public byte[] mapRecord(Tuple tuple) { JSONObject obj = new JSONObject(); - if(this.columnFields != null) { - for(String field: this.columnFields) { - obj.put(field,tuple.getValueByField(field)); + if (this.columnFields != null) { + for (String field : this.columnFields) { + obj.put(field, tuple.getValueByField(field)); } } return obj.toJSONString().getBytes(); @@ -103,8 +96,8 @@ public class JsonRecordHiveMapper implements HiveMapper { @Override public List<String> mapPartitions(TridentTuple tuple) { List<String> partitionList = new ArrayList<String>(); - if(this.partitionFields != null) { - for(String field: this.partitionFields) { + if (this.partitionFields != null) { + for (String field : this.partitionFields) { partitionList.add(tuple.getStringByField(field)); } } @@ -117,9 +110,9 @@ public class JsonRecordHiveMapper implements HiveMapper { @Override public byte[] mapRecord(TridentTuple tuple) { JSONObject obj = new JSONObject(); - if(this.columnFields != null) { - for(String field: this.columnFields) { - obj.put(field,tuple.getValueByField(field)); + if (this.columnFields != null) { + for (String field : this.columnFields) { + obj.put(field, tuple.getValueByField(field)); } } return obj.toJSONString().getBytes(); http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java index e80f5d7..4a91da1 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java @@ -1,25 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hive.common; import java.io.Serializable; - import org.apache.storm.hive.bolt.mapper.HiveMapper; public class HiveOptions implements Serializable { @@ -43,15 +36,14 @@ public class HiveOptions implements Serializable { protected String kerberosKeytab; protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS; - public HiveOptions(String metaStoreURI,String databaseName,String tableName,HiveMapper mapper) { + public HiveOptions(String metaStoreURI, String databaseName, String tableName, HiveMapper mapper) { this.metaStoreURI = metaStoreURI; this.databaseName = databaseName; this.tableName = tableName; this.mapper = mapper; } - public HiveOptions withTickTupleInterval(Integer tickInterval) - { + public HiveOptions withTickTupleInterval(Integer tickInterval) { this.tickTupleInterval = tickInterval; return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java index 26beee0..7681f91 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java @@ -1,23 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hive.common; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.streaming.ConnectionError; @@ -26,12 +25,6 @@ import org.apache.storm.hive.security.AutoHive; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; - import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS; public class HiveUtils { @@ -44,16 +37,19 @@ public class HiveUtils { return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals); } - public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options, boolean tokenAuthEnabled) - throws HiveWriter.ConnectFailure, InterruptedException { + public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, + HiveOptions options, boolean tokenAuthEnabled) + throws HiveWriter.ConnectFailure, InterruptedException { return new HiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(), - options.getCallTimeOut(), callTimeoutPool, options.getMapper(), ugi, tokenAuthEnabled); + options.getCallTimeOut(), callTimeoutPool, options.getMapper(), ugi, tokenAuthEnabled); } - public static synchronized UserGroupInformation authenticate(boolean isTokenAuthEnabled, String keytab, String principal) throws AuthenticationFailed { + public static synchronized UserGroupInformation authenticate(boolean isTokenAuthEnabled, String keytab, String principal) throws + AuthenticationFailed { - if (isTokenAuthEnabled) + if (isTokenAuthEnabled) { return getCurrentUser(principal); + } boolean kerberosEnabled = false; @@ -70,7 +66,7 @@ public class HiveUtils { if (!(kfile.isFile() && kfile.canRead())) { throw new IllegalArgumentException("The keyTab file: " + keytab + " is nonexistent or can't read. " - + "Please specify a readable keytab file for Kerberos auth."); + + "Please specify a readable keytab file for Kerberos auth."); } try { @@ -91,12 +87,6 @@ public class HiveUtils { } - public static class AuthenticationFailed extends Exception { - public AuthenticationFailed(String reason, Exception cause) { - super("Kerberos Authentication Failed. " + reason, cause); - } - } - public static void logAllHiveEndPoints(Map<HiveEndPoint, HiveWriter> allWriters) { for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { LOG.info("cached writers {} ", entry.getValue()); @@ -104,10 +94,10 @@ public class HiveUtils { } public static boolean isTokenAuthEnabled(Map<String, Object> conf) { - return conf.get(TOPOLOGY_AUTO_CREDENTIALS) != null && (((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHive.class.getName())); + return conf.get(TOPOLOGY_AUTO_CREDENTIALS) != null && + (((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHive.class.getName())); } - private static UserGroupInformation getCurrentUser(String principal) throws AuthenticationFailed { try { return UserGroupInformation.getCurrentUser(); @@ -115,4 +105,10 @@ public class HiveUtils { throw new AuthenticationFailed("Login failed for principal " + principal, e); } } + + public static class AuthenticationFailed extends Exception { + public AuthenticationFailed(String reason, Exception cause) { + super("Kerberos Authentication Failed. " + reason, cause); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java index e9a30fe..bc0ee75 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java @@ -1,23 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hive.common; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.Callable; @@ -26,14 +21,17 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hive.hcatalog.streaming.*; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; +import org.apache.hive.hcatalog.streaming.RecordWriter; +import org.apache.hive.hcatalog.streaming.SerializationError; +import org.apache.hive.hcatalog.streaming.StreamingConnection; +import org.apache.hive.hcatalog.streaming.StreamingException; +import org.apache.hive.hcatalog.streaming.StreamingIOFailure; +import org.apache.hive.hcatalog.streaming.TransactionBatch; import org.apache.storm.hive.bolt.mapper.HiveMapper; import org.apache.storm.tuple.Tuple; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +48,9 @@ public class HiveWriter { private final ExecutorService callTimeoutPool; private final long callTimeout; private final Object txnBatchLock = new Object(); + protected boolean closed; // flag indicating HiveWriter was closed private TransactionBatch txnBatch; private long lastUsed; // time of last flush on this writer - protected boolean closed; // flag indicating HiveWriter was closed private boolean autoCreatePartitions; private UserGroupInformation ugi; private int totalRecords = 0; @@ -74,37 +72,52 @@ public class HiveWriter { this.txnBatch = nextTxnBatch(recordWriter); this.closed = false; this.lastUsed = System.currentTimeMillis(); - } catch(InterruptedException e) { + } catch (InterruptedException e) { throw e; - } catch(RuntimeException e) { + } catch (RuntimeException e) { throw e; - } catch(Exception e) { + } catch (Exception e) { throw new ConnectFailure(endPoint, e); } } - public RecordWriter getRecordWriter(final HiveMapper mapper, final boolean tokenAuthEnabled) throws Exception { - if (!tokenAuthEnabled) - return mapper.createRecordWriter(endPoint); + /** + * If the current thread has been interrupted, then throws an + * exception. + * @throws InterruptedException + */ + private static void checkAndThrowInterruptedException() + throws InterruptedException { + if (Thread.currentThread().interrupted()) { + throw new InterruptedException("Timed out before Hive call was made. " + + "Your callTimeout might be set too low or Hive calls are " + + "taking too long."); + } + } + + public RecordWriter getRecordWriter(final HiveMapper mapper, final boolean tokenAuthEnabled) throws Exception { + if (!tokenAuthEnabled) { + return mapper.createRecordWriter(endPoint); + } try { - return ugi.doAs ( - new PrivilegedExceptionAction<RecordWriter>() { - @Override - public RecordWriter run() throws StreamingException, IOException, ClassNotFoundException { - return mapper.createRecordWriter(endPoint); - } + return ugi.doAs( + new PrivilegedExceptionAction<RecordWriter>() { + @Override + public RecordWriter run() throws StreamingException, IOException, ClassNotFoundException { + return mapper.createRecordWriter(endPoint); } + } ); } catch (Exception e) { throw new ConnectFailure(endPoint, e); } } - - private HiveConf createHiveConf(String metaStoreURI, boolean tokenAuthEnabled) { - if (!tokenAuthEnabled) + private HiveConf createHiveConf(String metaStoreURI, boolean tokenAuthEnabled) { + if (!tokenAuthEnabled) { return null; + } HiveConf hcatConf = new HiveConf(); hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI); @@ -114,9 +127,9 @@ public class HiveWriter { @Override public String toString() { - return "{ " - + "endPoint = " + endPoint.toString() - + ", TransactionBatch = " + txnBatch.toString() + " }"; + return "{ " + + "endPoint = " + endPoint.toString() + + ", TransactionBatch = " + txnBatch.toString() + " }"; } /** @@ -135,18 +148,18 @@ public class HiveWriter { try { LOG.debug("Writing event to {}", endPoint); callWithTimeout(new CallRunner<Void>() { - @Override - public Void call() throws StreamingException, InterruptedException { - txnBatch.write(record); - totalRecords++; - return null; - } - }); - } catch(SerializationError se) { + @Override + public Void call() throws StreamingException, InterruptedException { + txnBatch.write(record); + totalRecords++; + return null; + } + }); + } catch (SerializationError se) { throw new SerializationError(endPoint.toString() + " SerializationError", se); - } catch(StreamingException e) { + } catch (StreamingException e) { throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e); - } catch(TimeoutException e) { + } catch (TimeoutException e) { throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e); } } @@ -161,13 +174,13 @@ public class HiveWriter { // if there are no records do not call flush if (totalRecords <= 0) return; try { - synchronized(txnBatchLock) { + synchronized (txnBatchLock) { commitTxn(); nextTxn(rollToNext); totalRecords = 0; lastUsed = System.currentTimeMillis(); } - } catch(StreamingException e) { + } catch (StreamingException e) { throw new TxnFailure(txnBatch, e); } } @@ -177,24 +190,24 @@ public class HiveWriter { */ public void heartBeat() throws InterruptedException { // 1) schedule the heartbeat on one thread in pool - synchronized(txnBatchLock) { + synchronized (txnBatchLock) { try { callWithTimeout(new CallRunner<Void>() { - @Override - public Void call() throws Exception { - try { - LOG.info("Sending heartbeat on batch " + txnBatch); - txnBatch.heartbeat(); - } catch (StreamingException e) { - LOG.warn("Heartbeat error on batch " + txnBatch, e); - } - return null; + @Override + public Void call() throws Exception { + try { + LOG.info("Sending heartbeat on batch " + txnBatch); + txnBatch.heartbeat(); + } catch (StreamingException e) { + LOG.warn("Heartbeat error on batch " + txnBatch, e); } - }); + return null; + } + }); } catch (InterruptedException e) { throw e; } catch (Exception e) { - LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e); + LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e); // Suppressing exceptions as we don't care for errors on heartbeats } } @@ -212,10 +225,11 @@ public class HiveWriter { * Flush and Close current transactionBatch. */ public void flushAndClose() throws TxnBatchFailure, TxnFailure, CommitFailure, - IOException, InterruptedException { + IOException, InterruptedException { flush(false); close(); } + /** * Close the Transaction Batch and connection * @throws IOException @@ -231,28 +245,28 @@ public class HiveWriter { LOG.info("Closing connection to end point : {}", endPoint); try { callWithTimeout(new CallRunner<Void>() { - @Override - public Void call() throws Exception { - connection.close(); // could block - return null; - } - }); - } catch(Exception e) { + @Override + public Void call() throws Exception { + connection.close(); // could block + return null; + } + }); + } catch (Exception e) { LOG.warn("Error closing connection to EndPoint : " + endPoint, e); // Suppressing exceptions as we don't care for errors on connection close } } private void commitTxn() throws CommitFailure, InterruptedException { - LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId() , endPoint); + LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId(), endPoint); try { callWithTimeout(new CallRunner<Void>() { - @Override - public Void call() throws Exception { - txnBatch.commit(); // could block - return null; - } - }); + @Override + public Void call() throws Exception { + txnBatch.commit(); // could block + return null; + } + }); } catch (StreamingException e) { throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e); } catch (TimeoutException e) { @@ -264,15 +278,16 @@ public class HiveWriter { StreamingConnection newConnection(final UserGroupInformation ugi, final boolean tokenAuthEnabled) throws InterruptedException, ConnectFailure { try { - return callWithTimeout(new CallRunner<StreamingConnection>() { - @Override - public StreamingConnection call() throws Exception { - return endPoint.newConnection(autoCreatePartitions, createHiveConf(endPoint.metaStoreUri, tokenAuthEnabled) , ugi); // could block - } - }); - } catch(StreamingException e) { + return callWithTimeout(new CallRunner<StreamingConnection>() { + @Override + public StreamingConnection call() throws Exception { + return endPoint + .newConnection(autoCreatePartitions, createHiveConf(endPoint.metaStoreUri, tokenAuthEnabled), ugi); // could block + } + }); + } catch (StreamingException e) { throw new ConnectFailure(endPoint, e); - } catch(TimeoutException e) { + } catch (TimeoutException e) { throw new ConnectFailure(endPoint, e); } } @@ -290,30 +305,30 @@ public class HiveWriter { }); batch.beginNextTransaction(); LOG.debug("Acquired {}. Switching to first txn", batch); - } catch(TimeoutException e) { + } catch (TimeoutException e) { throw new TxnBatchFailure(endPoint, e); - } catch(StreamingException e) { + } catch (StreamingException e) { throw new TxnBatchFailure(endPoint, e); } return batch; } - private void closeTxnBatch() throws InterruptedException { + private void closeTxnBatch() throws InterruptedException { try { LOG.debug("Closing Txn Batch {}", txnBatch); callWithTimeout(new CallRunner<Void>() { - @Override - public Void call() throws Exception { - if(txnBatch != null) { - txnBatch.close(); // could block - } - return null; + @Override + public Void call() throws Exception { + if (txnBatch != null) { + txnBatch.close(); // could block } - }); - } catch(InterruptedException e) { + return null; + } + }); + } catch (InterruptedException e) { throw e; - } catch(Exception e) { - LOG.warn("Error closing txn batch "+ txnBatch, e); + } catch (Exception e) { + LOG.warn("Error closing txn batch " + txnBatch, e); } } @@ -322,13 +337,12 @@ public class HiveWriter { * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn */ public void abort() throws StreamingException, TxnBatchFailure, InterruptedException { - synchronized(txnBatchLock) { + synchronized (txnBatchLock) { abortTxn(); nextTxn(true); // roll to next } } - /** * Aborts current Txn in the txnBatch. */ @@ -336,12 +350,12 @@ public class HiveWriter { LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint); try { callWithTimeout(new CallRunner<Void>() { - @Override - public Void call() throws StreamingException, InterruptedException { - txnBatch.abort(); // could block - return null; - } - }); + @Override + public Void call() throws StreamingException, InterruptedException { + txnBatch.abort(); // could block + return null; + } + }); } catch (InterruptedException e) { throw e; } catch (TimeoutException e) { @@ -352,40 +366,25 @@ public class HiveWriter { } } - /** * if there are remainingTransactions in current txnBatch, begins nextTransactions * otherwise creates new txnBatch. * @param rollToNext */ private void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure { - if(txnBatch.remainingTransactions() == 0) { + if (txnBatch.remainingTransactions() == 0) { closeTxnBatch(); txnBatch = null; - if(rollToNext) { + if (rollToNext) { txnBatch = nextTxnBatch(recordWriter); } - } else if(rollToNext) { + } else if (rollToNext) { LOG.debug("Switching to next Txn for {}", endPoint); txnBatch.beginNextTransaction(); // does not block } } /** - * If the current thread has been interrupted, then throws an - * exception. - * @throws InterruptedException - */ - private static void checkAndThrowInterruptedException() - throws InterruptedException { - if (Thread.currentThread().interrupted()) { - throw new InterruptedException("Timed out before Hive call was made. " - + "Your callTimeout might be set too low or Hive calls are " - + "taking too long."); - } - } - - /** * Execute the callable on a separate thread and wait for the completion * for the specified amount of time in milliseconds. In case of timeout * cancel the callable and throw an IOException @@ -393,11 +392,11 @@ public class HiveWriter { private <T> T callWithTimeout(final CallRunner<T> callRunner) throws TimeoutException, StreamingException, InterruptedException { Future<T> future = callTimeoutPool.submit(new Callable<T>() { - @Override - public T call() throws Exception { - return callRunner.call(); - } - }); + @Override + public T call() throws Exception { + return callRunner.call(); + } + }); try { if (callTimeout > 0) { return future.get(callTimeout, TimeUnit.MILLISECONDS); @@ -431,7 +430,7 @@ public class HiveWriter { private byte[] generateRecord(Tuple tuple) { StringBuilder buf = new StringBuilder(); - for (Object o: tuple.getValues()) { + for (Object o : tuple.getValues()) { buf.append(o); buf.append(","); } http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java index a7685f0..a698e24 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java @@ -1,50 +1,43 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hive.trident; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.state.State; -import org.apache.storm.trident.tuple.TridentTuple; -import org.apache.storm.task.IMetricsContext; -import org.apache.storm.topology.FailedException; -import org.apache.storm.hive.common.HiveWriter; -import org.apache.hive.hcatalog.streaming.*; -import org.apache.storm.hive.common.HiveOptions; -import org.apache.storm.hive.common.HiveUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Map.Entry; import java.util.Timer; import java.util.TimerTask; -import java.util.Map.Entry; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; +import org.apache.hive.hcatalog.streaming.StreamingException; +import org.apache.storm.hive.common.HiveOptions; +import org.apache.storm.hive.common.HiveUtils; +import org.apache.storm.hive.common.HiveWriter; +import org.apache.storm.task.IMetricsContext; +import org.apache.storm.topology.FailedException; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HiveState implements State { private static final Logger LOG = LoggerFactory.getLogger(HiveState.class); @@ -73,24 +66,24 @@ public class HiveState implements State { public void commit(Long txId) { } - public void prepare(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + public void prepare(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { try { tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf); try { ugi = HiveUtils.authenticate(tokenAuthEnabled, options.getKerberosKeytab(), options.getKerberosPrincipal()); - } catch(HiveUtils.AuthenticationFailed ex) { + } catch (HiveUtils.AuthenticationFailed ex) { LOG.error("Hive kerberos authentication failed " + ex.getMessage(), ex); throw new IllegalArgumentException(ex); } - allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>(); + allWriters = new ConcurrentHashMap<HiveEndPoint, HiveWriter>(); String timeoutName = "hive-bolt-%d"; this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); - heartBeatTimer= new Timer(); + heartBeatTimer = new Timer(); setupHeartBeatTimer(); - } catch(Exception e) { - LOG.warn("unable to make connection to hive ",e); + } catch (Exception e) { + LOG.warn("unable to make connection to hive ", e); } } @@ -99,7 +92,7 @@ public class HiveState implements State { writeTuples(tuples); } catch (Exception e) { abortAndCloseWriters(); - LOG.warn("hive streaming failed.",e); + LOG.warn("hive streaming failed.", e); throw new FailedException(e); } } @@ -112,7 +105,7 @@ public class HiveState implements State { HiveWriter writer = getOrCreateWriter(endPoint); writer.write(options.getMapper().mapRecord(tuple)); currentBatchSize++; - if(currentBatchSize >= options.getBatchSize()) { + if (currentBatchSize >= options.getBatchSize()) { flushAllWriters(); currentBatchSize = 0; } @@ -124,7 +117,7 @@ public class HiveState implements State { sendHeartBeat = false; abortAllWriters(); closeAllWriters(); - } catch(Exception ie) { + } catch (Exception ie) { LOG.warn("unable to close hive connections. ", ie); } } @@ -133,7 +126,7 @@ public class HiveState implements State { * Abort current Txn on all writers */ private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure { - for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { entry.getValue().abort(); } } @@ -145,7 +138,7 @@ public class HiveState implements State { */ private void closeAllWriters() throws InterruptedException, IOException { //1) Retire writers - for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { entry.getValue().close(); } //2) Clear cache @@ -153,27 +146,27 @@ public class HiveState implements State { } private void setupHeartBeatTimer() { - if(options.getHeartBeatInterval()>0) { + if (options.getHeartBeatInterval() > 0) { heartBeatTimer.schedule(new TimerTask() { - @Override - public void run() { - try { - if (sendHeartBeat) { - LOG.debug("Start sending heartbeat on all writers"); - sendHeartBeatOnAllWriters(); - setupHeartBeatTimer(); - } - } catch (Exception e) { - LOG.warn("Failed to heartbeat on HiveWriter ", e); + @Override + public void run() { + try { + if (sendHeartBeat) { + LOG.debug("Start sending heartbeat on all writers"); + sendHeartBeatOnAllWriters(); + setupHeartBeatTimer(); } + } catch (Exception e) { + LOG.warn("Failed to heartbeat on HiveWriter ", e); } - }, options.getHeartBeatInterval() * 1000); + } + }, options.getHeartBeatInterval() * 1000); } } private void flushAllWriters() throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException { - for(HiveWriter writer: allWriters.values()) { + for (HiveWriter writer : allWriters.values()) { writer.flush(true); } } @@ -187,13 +180,13 @@ public class HiveState implements State { private HiveWriter getOrCreateWriter(HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException { try { - HiveWriter writer = allWriters.get( endPoint ); - if( writer == null ) { + HiveWriter writer = allWriters.get(endPoint); + if (writer == null) { LOG.info("Creating Writer to Hive end point : " + endPoint); writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options, tokenAuthEnabled); - if(allWriters.size() > (options.getMaxOpenConnections() - 1)){ + if (allWriters.size() > (options.getMaxOpenConnections() - 1)) { int retired = retireIdleWriters(); - if(retired==0) { + if (retired == 0) { retireEldestWriter(); } } @@ -208,15 +201,14 @@ public class HiveState implements State { } - /** * Locate writer that has not been used for longest time and retire it */ private void retireEldestWriter() { long oldestTimeStamp = System.currentTimeMillis(); HiveEndPoint eldest = null; - for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { - if(entry.getValue().getLastUsed() < oldestTimeStamp) { + for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { + if (entry.getValue().getLastUsed() < oldestTimeStamp) { eldest = entry.getKey(); oldestTimeStamp = entry.getValue().getLastUsed(); } @@ -244,19 +236,19 @@ public class HiveState implements State { ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>(); //1) Find retirement candidates - for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { - if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) { + for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { + if (now - entry.getValue().getLastUsed() > options.getIdleTimeout()) { ++count; retirees.add(entry.getKey()); } } //2) Retire them - for(HiveEndPoint ep : retirees) { + for (HiveEndPoint ep : retirees) { try { LOG.info("Closing idle Writer to Hive end point : {}", ep); allWriters.remove(ep).flushAndClose(); } catch (IOException e) { - LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e); + LOG.warn("Failed to close writer for end point: {}. Error: " + ep, e); } catch (InterruptedException e) { LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); Thread.currentThread().interrupt(); @@ -285,13 +277,13 @@ public class HiveState implements State { } } - ExecutorService toShutdown[] = {callTimeoutPool}; + ExecutorService toShutdown[] = { callTimeoutPool }; for (ExecutorService execService : toShutdown) { execService.shutdown(); try { while (!execService.isTerminated()) { execService.awaitTermination( - options.getCallTimeOut(), TimeUnit.MILLISECONDS); + options.getCallTimeOut(), TimeUnit.MILLISECONDS); } } catch (InterruptedException ex) { LOG.warn("shutdown interrupted on " + execService, ex); http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java index 6659825..68722d1 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java @@ -1,40 +1,33 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hive.trident; +import java.util.Map; +import org.apache.storm.hive.common.HiveOptions; import org.apache.storm.task.IMetricsContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; -import org.apache.storm.hive.common.HiveOptions; - -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HiveStateFactory implements StateFactory { private static final Logger LOG = LoggerFactory.getLogger(HiveStateFactory.class); private HiveOptions options; - public HiveStateFactory(){} + public HiveStateFactory() {} - public HiveStateFactory withOptions(HiveOptions options){ + public HiveStateFactory withOptions(HiveOptions options) { this.options = options; return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java index 062f7fb..36479d9 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java @@ -1,30 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hive.trident; +import java.util.List; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.BaseStateUpdater; import org.apache.storm.trident.tuple.TridentTuple; -import java.util.List; - -public class HiveUpdater extends BaseStateUpdater<HiveState>{ +public class HiveUpdater extends BaseStateUpdater<HiveState> { @Override public void updateState(HiveState state, List<TridentTuple> tuples, TridentCollector collector) { state.updateState(tuples, collector); http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java index d492819..6646390 100644 --- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java @@ -1,23 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hive.bolt; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; @@ -42,57 +45,7 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.thrift.TException; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class HiveSetupUtil { - public static class RawFileSystem extends RawLocalFileSystem { - private static final URI NAME; - static { - try { - NAME = new URI("raw:///"); - } catch (URISyntaxException se) { - throw new IllegalArgumentException("bad uri", se); - } - } - - @Override - public URI getUri() { - return NAME; - } - - @Override - public FileStatus getFileStatus(Path path) throws IOException { - File file = pathToFile(path); - if (!file.exists()) { - throw new FileNotFoundException("Can't find " + path); - } - // get close enough - short mod = 0; - if (file.canRead()) { - mod |= 0444; - } - if (file.canWrite()) { - mod |= 0200; - } - if (file.canExecute()) { - mod |= 0111; - } - ShimLoader.getHadoopShims(); - return new FileStatus(file.length(), file.isDirectory(), 1, 1024, - file.lastModified(), file.lastModified(), - FsPermission.createImmutable(mod), "owen", "users", path); - } - } - private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; public static HiveConf getHiveConf() { @@ -126,7 +79,7 @@ public class HiveSetupUtil { sd.setCols(getTableColumns(colNames, colTypes)); sd.setNumBuckets(1); sd.setLocation(dbLocation + Path.SEPARATOR + tableName); - if(partNames!=null && partNames.length!=0) { + if (partNames != null && partNames.length != 0) { tbl.setPartitionKeys(getPartitionKeys(partNames)); } @@ -146,10 +99,10 @@ public class HiveSetupUtil { tbl.setParameters(tableParams); client.createTable(tbl); try { - if(partVals!=null && partVals.size() > 0) { + if (partVals != null && partVals.size() > 0) { addPartition(client, tbl, partVals); } - } catch(AlreadyExistsException e) { + } catch (AlreadyExistsException e) { } } finally { client.close(); @@ -170,7 +123,7 @@ public class HiveSetupUtil { } private static void addPartition(IMetaStoreClient client, Table tbl - , List<String> partValues) + , List<String> partValues) throws IOException, TException { Partition part = new Partition(); part.setDbName(tbl.getDbName()); @@ -183,17 +136,17 @@ public class HiveSetupUtil { } private static String makePartPath(List<FieldSchema> partKeys, List<String> partVals) { - if(partKeys.size()!=partVals.size()) { + if (partKeys.size() != partVals.size()) { throw new IllegalArgumentException("Partition values:" + partVals + - ", does not match the partition Keys in table :" + partKeys ); + ", does not match the partition Keys in table :" + partKeys); } - StringBuffer buff = new StringBuffer(partKeys.size()*20); - int i=0; - for(FieldSchema schema : partKeys) { + StringBuffer buff = new StringBuffer(partKeys.size() * 20); + int i = 0; + for (FieldSchema schema : partKeys) { buff.append(schema.getName()); buff.append("="); buff.append(partVals.get(i)); - if(i!=partKeys.size()-1) { + if (i != partKeys.size() - 1) { buff.append(Path.SEPARATOR); } ++i; @@ -203,7 +156,7 @@ public class HiveSetupUtil { private static List<FieldSchema> getTableColumns(String[] colNames, String[] colTypes) { List<FieldSchema> fields = new ArrayList<FieldSchema>(); - for (int i=0; i<colNames.length; ++i) { + for (int i = 0; i < colNames.length; ++i) { fields.add(new FieldSchema(colNames[i], colTypes[i], "")); } return fields; @@ -211,10 +164,50 @@ public class HiveSetupUtil { private static List<FieldSchema> getPartitionKeys(String[] partNames) { List<FieldSchema> fields = new ArrayList<FieldSchema>(); - for (int i=0; i < partNames.length; ++i) { - fields.add(new FieldSchema(partNames[i], serdeConstants.STRING_TYPE_NAME, "")); + for (int i = 0; i < partNames.length; ++i) { + fields.add(new FieldSchema(partNames[i], serdeConstants.STRING_TYPE_NAME, "")); } return fields; } + public static class RawFileSystem extends RawLocalFileSystem { + private static final URI NAME; + + static { + try { + NAME = new URI("raw:///"); + } catch (URISyntaxException se) { + throw new IllegalArgumentException("bad uri", se); + } + } + + @Override + public URI getUri() { + return NAME; + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + File file = pathToFile(path); + if (!file.exists()) { + throw new FileNotFoundException("Can't find " + path); + } + // get close enough + short mod = 0; + if (file.canRead()) { + mod |= 0444; + } + if (file.canWrite()) { + mod |= 0200; + } + if (file.canExecute()) { + mod |= 0111; + } + ShimLoader.getHadoopShims(); + return new FileStatus(file.length(), file.isDirectory(), 1, 1024, + file.lastModified(), file.lastModified(), + FsPermission.createImmutable(mod), "owen", "users", path); + } + } + }
