http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 8decc94..53f24b9 100644 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -12667,6 +12667,204 @@ class HeartbeatWriteIdResult: def __ne__(self, other): return not (self == other) +class GetValidWriteIdsRequest: + """ + Attributes: + - dbName + - tblName + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'dbName', None, None, ), # 1 + (2, TType.STRING, 'tblName', None, None, ), # 2 + ) + + def __init__(self, dbName=None, tblName=None,): + self.dbName = dbName + self.tblName = tblName + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.dbName = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.tblName = iprot.readString() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('GetValidWriteIdsRequest') + if self.dbName is not None: + oprot.writeFieldBegin('dbName', TType.STRING, 1) + oprot.writeString(self.dbName) + oprot.writeFieldEnd() + if self.tblName is not None: + oprot.writeFieldBegin('tblName', TType.STRING, 2) + oprot.writeString(self.tblName) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.dbName is None: + raise TProtocol.TProtocolException(message='Required field dbName is unset!') + if self.tblName is None: + raise TProtocol.TProtocolException(message='Required field tblName is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.dbName) + value = (value * 31) ^ hash(self.tblName) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class GetValidWriteIdsResult: + """ + Attributes: + - lowWatermarkId + - highWatermarkId + - areIdsValid + - ids + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'lowWatermarkId', None, None, ), # 1 + (2, TType.I64, 'highWatermarkId', None, None, ), # 2 + (3, TType.BOOL, 'areIdsValid', None, None, ), # 3 + (4, TType.LIST, 'ids', (TType.I64,None), None, ), # 4 + ) + + def __init__(self, lowWatermarkId=None, highWatermarkId=None, areIdsValid=None, ids=None,): + self.lowWatermarkId = lowWatermarkId + self.highWatermarkId = highWatermarkId + self.areIdsValid = areIdsValid + self.ids = ids + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.lowWatermarkId = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.highWatermarkId = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.BOOL: + self.areIdsValid = iprot.readBool() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.LIST: + self.ids = [] + (_etype562, _size559) = iprot.readListBegin() + for _i563 in xrange(_size559): + _elem564 = iprot.readI64() + self.ids.append(_elem564) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('GetValidWriteIdsResult') + if self.lowWatermarkId is not None: + oprot.writeFieldBegin('lowWatermarkId', TType.I64, 1) + oprot.writeI64(self.lowWatermarkId) + oprot.writeFieldEnd() + if self.highWatermarkId is not None: + oprot.writeFieldBegin('highWatermarkId', TType.I64, 2) + oprot.writeI64(self.highWatermarkId) + oprot.writeFieldEnd() + if self.areIdsValid is not None: + oprot.writeFieldBegin('areIdsValid', TType.BOOL, 3) + oprot.writeBool(self.areIdsValid) + oprot.writeFieldEnd() + if self.ids is not None: + oprot.writeFieldBegin('ids', TType.LIST, 4) + oprot.writeListBegin(TType.I64, len(self.ids)) + for iter565 in self.ids: + oprot.writeI64(iter565) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.lowWatermarkId is None: + raise TProtocol.TProtocolException(message='Required field lowWatermarkId is unset!') + if self.highWatermarkId is None: + raise TProtocol.TProtocolException(message='Required field highWatermarkId is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.lowWatermarkId) + value = (value * 31) ^ hash(self.highWatermarkId) + value = (value * 31) ^ hash(self.areIdsValid) + value = (value * 31) ^ hash(self.ids) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class GetAllFunctionsResponse: """ Attributes: @@ -12693,11 +12891,11 @@ class GetAllFunctionsResponse: if fid == 1: if ftype == TType.LIST: self.functions = [] - (_etype562, _size559) = iprot.readListBegin() - for _i563 in xrange(_size559): - _elem564 = Function() - _elem564.read(iprot) - self.functions.append(_elem564) + (_etype569, _size566) = iprot.readListBegin() + for _i570 in xrange(_size566): + _elem571 = Function() + _elem571.read(iprot) + self.functions.append(_elem571) iprot.readListEnd() else: iprot.skip(ftype) @@ -12714,8 +12912,8 @@ class GetAllFunctionsResponse: if self.functions is not None: oprot.writeFieldBegin('functions', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.functions)) - for iter565 in self.functions: - iter565.write(oprot) + for iter572 in self.functions: + iter572.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 95f2075..ca60ba4 100644 --- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2876,6 +2876,50 @@ class HeartbeatWriteIdResult ::Thrift::Struct.generate_accessors self end +class GetValidWriteIdsRequest + include ::Thrift::Struct, ::Thrift::Struct_Union + DBNAME = 1 + TBLNAME = 2 + + FIELDS = { + DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'}, + TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'} + } + + def struct_fields; FIELDS; end + + def validate + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tblName is unset!') unless @tblName + end + + ::Thrift::Struct.generate_accessors self +end + +class GetValidWriteIdsResult + include ::Thrift::Struct, ::Thrift::Struct_Union + LOWWATERMARKID = 1 + HIGHWATERMARKID = 2 + AREIDSVALID = 3 + IDS = 4 + + FIELDS = { + LOWWATERMARKID => {:type => ::Thrift::Types::I64, :name => 'lowWatermarkId'}, + HIGHWATERMARKID => {:type => ::Thrift::Types::I64, :name => 'highWatermarkId'}, + AREIDSVALID => {:type => ::Thrift::Types::BOOL, :name => 'areIdsValid', :optional => true}, + IDS => {:type => ::Thrift::Types::LIST, :name => 'ids', :element => {:type => ::Thrift::Types::I64}, :optional => true} + } + + def struct_fields; FIELDS; end + + def validate + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field lowWatermarkId is unset!') unless @lowWatermarkId + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field highWatermarkId is unset!') unless @highWatermarkId + end + + ::Thrift::Struct.generate_accessors self +end + class GetAllFunctionsResponse include ::Thrift::Struct, ::Thrift::Struct_Union FUNCTIONS = 1 http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb index 403e07f..613702f 100644 --- a/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb +++ b/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb @@ -2529,6 +2529,21 @@ module ThriftHiveMetastore raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'heartbeat_write_id failed: unknown result') end + def get_valid_write_ids(req) + send_get_valid_write_ids(req) + return recv_get_valid_write_ids() + end + + def send_get_valid_write_ids(req) + send_message('get_valid_write_ids', Get_valid_write_ids_args, :req => req) + end + + def recv_get_valid_write_ids() + result = receive_message(Get_valid_write_ids_result) + return result.success unless result.success.nil? + raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_valid_write_ids failed: unknown result') + end + end class Processor < ::FacebookService::Processor @@ -4396,6 +4411,13 @@ module ThriftHiveMetastore write_result(result, oprot, 'heartbeat_write_id', seqid) end + def process_get_valid_write_ids(seqid, iprot, oprot) + args = read_args(iprot, Get_valid_write_ids_args) + result = Get_valid_write_ids_result.new() + result.success = @handler.get_valid_write_ids(args.req) + write_result(result, oprot, 'get_valid_write_ids', seqid) + end + end # HELPER FUNCTIONS AND STRUCTURES @@ -10092,5 +10114,37 @@ module ThriftHiveMetastore ::Thrift::Struct.generate_accessors self end + class Get_valid_write_ids_args + include ::Thrift::Struct, ::Thrift::Struct_Union + REQ = 1 + + FIELDS = { + REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => ::GetValidWriteIdsRequest} + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + + class Get_valid_write_ids_result + include ::Thrift::Struct, ::Thrift::Struct_Union + SUCCESS = 0 + + FIELDS = { + SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::GetValidWriteIdsResult} + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + end http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index f99bcd2..e1d41c4 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -119,6 +119,7 @@ import javax.jdo.JDOException; import java.io.IOException; import java.nio.ByteBuffer; +import java.sql.SQLException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.AbstractMap; @@ -134,6 +135,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.Set; import java.util.Timer; import java.util.concurrent.Callable; @@ -438,19 +440,19 @@ public class HiveMetaStore extends ThriftHiveMetastore { updateMetrics(); LOG.info("Finished metadata count metrics: " + initDatabaseCount + " databases, " + initTableCount + " tables, " + initPartCount + " partitions."); - metrics.addGauge(MetricsConstant.INIT_TOTAL_DATABASES, new MetricsVariable() { + metrics.addGauge(MetricsConstant.INIT_TOTAL_DATABASES, new MetricsVariable<Object>() { @Override public Object getValue() { return initDatabaseCount; } }); - metrics.addGauge(MetricsConstant.INIT_TOTAL_TABLES, new MetricsVariable() { + metrics.addGauge(MetricsConstant.INIT_TOTAL_TABLES, new MetricsVariable<Object>() { @Override public Object getValue() { return initTableCount; } }); - metrics.addGauge(MetricsConstant.INIT_TOTAL_PARTITIONS, new MetricsVariable() { + metrics.addGauge(MetricsConstant.INIT_TOTAL_PARTITIONS, new MetricsVariable<Object>() { @Override public Object getValue() { return initPartCount; @@ -1264,26 +1266,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { return (ms.getType(typeName) != null); } - private void drop_type_core(final RawStore ms, String typeName) - throws NoSuchObjectException, MetaException { - boolean success = false; - try { - ms.openTransaction(); - // drop any partitions - if (!is_type_exists(ms, typeName)) { - throw new NoSuchObjectException(typeName + " doesn't exist"); - } - if (!ms.dropType(typeName)) { - throw new MetaException("Unable to drop type " + typeName); - } - success = ms.commitTransaction(); - } finally { - if (!success) { - ms.rollbackTransaction(); - } - } - } - @Override public boolean drop_type(final String name) throws MetaException, NoSuchObjectException { startFunction("drop_type", ": " + name); @@ -1818,7 +1800,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { //No drop part listener events fired for public listeners historically, for drop table case. //Limiting to internal listeners for now, to avoid unexpected calls for public listeners. if (listener instanceof HMSMetricsListener) { - for (Partition part : partsToDelete) { + for (@SuppressWarnings("unused") Partition part : partsToDelete) { listener.onDropPartition(null); } } @@ -2294,7 +2276,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } - partFutures.add(threadPool.submit(new Callable() { + partFutures.add(threadPool.submit(new Callable<Partition>() { @Override public Partition call() throws Exception { boolean madeDir = createLocationForAddedPartition(table, part); @@ -2456,8 +2438,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { LOG.info("Not adding partition " + part + " as it already exists"); continue; } - partFutures.add(threadPool.submit(new Callable() { - @Override public Object call() throws Exception { + partFutures.add(threadPool.submit(new Callable<Partition>() { + @Override public Partition call() throws Exception { boolean madeDir = createLocationForAddedPartition(table, part); if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) { // Technically, for ifNotExists case, we could insert one and discard the other @@ -2474,7 +2456,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { try { for (Future<Partition> partFuture : partFutures) { - Partition part = partFuture.get(); + partFuture.get(); } } catch (InterruptedException | ExecutionException e) { // cancel other tasks @@ -3777,6 +3759,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } + @SuppressWarnings("deprecation") Deserializer s = MetaStoreUtils.getDeserializer(curConf, tbl, false); ret = MetaStoreUtils.getFieldsFromDeserializer(tableName, s); } catch (SerDeException e) { @@ -5745,7 +5728,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { throw newMetaException(e); } } - endFunction("partition_name_has_valid_characters", true, null); + endFunction("partition_name_has_valid_characters", true, ex); return ret; } @@ -6044,21 +6027,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { return new GetRoleGrantsForPrincipalResponse(roleMaps); } - /** - * Convert each MRoleMap object into a thrift RolePrincipalGrant object - * @param roles - * @return - */ - private List<RolePrincipalGrant> getRolePrincipalGrants(List<Role> roles) throws MetaException { - List<RolePrincipalGrant> rolePrinGrantList = new ArrayList<RolePrincipalGrant>(); - if (roles != null) { - for (Role role : roles) { - rolePrinGrantList.addAll(getMS().listRoleMembers(role.getRoleName())); - } - } - return rolePrinGrantList; - } - @Override public AggrStats get_aggr_stats_for(PartitionsStatsRequest request) throws NoSuchObjectException, MetaException, TException { @@ -6448,31 +6416,47 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } + private final Random random = new Random(); @Override public GetNextWriteIdResult get_next_write_id(GetNextWriteIdRequest req) throws TException { RawStore ms = getMS(); String dbName = req.getDbName(), tblName = req.getTblName(); startFunction("get_next_write_id", " : db=" + dbName + " tbl=" + tblName); - Exception ex = null; + Exception exception = null; long writeId = -1; - // TODO# see TXN about how to handle conflicts try { - boolean ok = false; - ms.openTransaction(); - try { - Table tbl = ms.getTable(dbName, tblName); - if (tbl == null) { - throw new NoSuchObjectException(dbName + "." + tblName); + int deadlockTryCount = 10; + int deadlockRetryBackoffMs = 200; + while (deadlockTryCount > 0) { + boolean ok = false; + ms.openTransaction(); + try { + Table tbl = ms.getTable(dbName, tblName); + if (tbl == null) { + throw new NoSuchObjectException(dbName + "." + tblName); + } + writeId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0; + tbl.setMmNextWriteId(writeId + 1); + ms.alterTable(dbName, tblName, tbl); + ok = true; + } finally { + if (!ok) { + ms.rollbackTransaction(); + // Exception should propagate; don't override it by breaking out of the loop. + } else { + Boolean commitResult = ms.commitTransactionExpectDeadlock(); + if (commitResult != null) { + if (commitResult) break; // Assume no exception; ok to break out of the loop. + throw new MetaException("Failed to commit"); + } + } } - writeId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0; - tbl.setMmNextWriteId(writeId + 1); - ms.alterTable(dbName, tblName, tbl); - ok = true; - } finally { - commitOrRollback(ms, ok); + LOG.warn("Getting the next write ID failed due to a deadlock; retrying"); + Thread.sleep(random.nextInt(deadlockRetryBackoffMs)); } + // Do a separate txn after we have reserved the number. TODO: If we fail, ignore on read. - ok = false; + boolean ok = false; ms.openTransaction(); try { Table tbl = ms.getTable(dbName, tblName); @@ -6482,10 +6466,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { commitOrRollback(ms, ok); } } catch (Exception e) { - ex = e; + exception = e; throwMetaException(e); } finally { - endFunction("get_next_write_id", ex == null, ex, tblName); + endFunction("get_next_write_id", exception == null, exception, tblName); } return new GetNextWriteIdResult(writeId); } @@ -6562,10 +6546,65 @@ public class HiveMetaStore extends ThriftHiveMetastore { assert tw.getState().length() == 1; char state = tw.getState().charAt(0); if (state != MM_WRITE_OPEN) { - throw new MetaException("Invalid write state to finalize: " + state); + throw new MetaException("Invalid write state: " + state); } return tw; } + + @Override + public GetValidWriteIdsResult get_valid_write_ids( + GetValidWriteIdsRequest req) throws TException { + RawStore ms = getMS(); + String dbName = req.getDbName(), tblName = req.getTblName(); + startFunction("get_valid_write_ids", " : db=" + dbName + " tbl=" + tblName); + GetValidWriteIdsResult result = new GetValidWriteIdsResult(); + Exception ex = null; + try { + boolean ok = false; + ms.openTransaction(); + try { + Table tbl = ms.getTable(dbName, tblName); + if (tbl == null) { + throw new InvalidObjectException(dbName + "." + tblName); + } + long nextId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0; + long watermarkId = tbl.isSetMmWatermarkWriteId() ? tbl.getMmWatermarkWriteId() : -1; + if (nextId > (watermarkId + 1)) { + // There may be some intermediate failed or active writes; get the valid ones. + List<Long> ids = ms.getWriteIds( + dbName, tblName, watermarkId, nextId, MM_WRITE_COMMITTED); + // TODO: we could optimize here and send the smaller of the lists, and also use ranges + if (ids != null) { + Iterator<Long> iter = ids.iterator(); + long oldWatermarkId = watermarkId; + while (iter.hasNext()) { + if (iter.next() != watermarkId + 1) break; + ++watermarkId; + } + long removed = watermarkId - oldWatermarkId; + if (removed > 0) { + ids = ids.subList((int)removed, ids.size()); + } + if (!ids.isEmpty()) { + result.setIds(ids); + result.setAreIdsValid(true); + } + } + } + result.setHighWatermarkId(nextId); + result.setLowWatermarkId(watermarkId); + ok = true; + } finally { + commitOrRollback(ms, ok); + } + } catch (Exception e) { + ex = e; + throwMetaException(e); + } finally { + endFunction("get_valid_write_ids", ex == null, ex, tblName); + } + return result; + } } @@ -7053,7 +7092,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } private static MetaStoreThread instantiateThread(String classname) throws Exception { - Class c = Class.forName(classname); + Class<?> c = Class.forName(classname); Object o = c.newInstance(); if (MetaStoreThread.class.isAssignableFrom(o.getClass())) { return (MetaStoreThread)o; @@ -7082,7 +7121,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService")); startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidWriteSetService")); } - private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception { + private static void startHouseKeeperService(HiveConf conf, Class<?> c) throws Exception { //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop() //should be called form it HouseKeeperService houseKeeper = (HouseKeeperService)c.newInstance(); http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 6bd6d92..0325854 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2404,4 +2404,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient { String dbName, String tableName, long writeId) throws TException { client.heartbeat_write_id(new HeartbeatWriteIdRequest(dbName, tableName, writeId)); } + + @Override + public GetValidWriteIdsResult getValidWriteIds( + String dbName, String tableName) throws TException { + return client.get_valid_write_ids(new GetValidWriteIdsRequest(dbName, tableName)); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index f5d611d..8706312 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse; import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResult; import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; @@ -1626,4 +1627,6 @@ public interface IMetaStoreClient { void finalizeTableWrite(String dbName, String tableName, long writeId, boolean commit) throws TException; + + GetValidWriteIdsResult getValidWriteIds(String dbName, String tableName) throws TException; } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 561f3e3..125a3e5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -113,15 +113,8 @@ class MetaStoreDirectSql { private final boolean isAggregateStatsCacheEnabled; private AggregateStatsCache aggrStatsCache; - public MetaStoreDirectSql(PersistenceManager pm, Configuration conf) { + public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, DatabaseProduct dbType) { this.pm = pm; - DatabaseProduct dbType = null; - try { - dbType = DatabaseProduct.determineDatabaseProduct(getProductName()); - } catch (SQLException e) { - LOG.warn("Cannot determine database product; assuming OTHER", e); - dbType = DatabaseProduct.OTHER; - } this.dbType = dbType; int batchSize = HiveConf.getIntVar(conf, ConfVars.METASTORE_DIRECT_SQL_PARTITION_BATCH_SIZE); if (batchSize == DETECT_BATCHING) { http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 9dc80b1..fb3b1ad 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -25,6 +25,8 @@ import java.lang.reflect.Field; import java.net.InetAddress; import java.net.URI; import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -52,6 +54,7 @@ import javax.jdo.PersistenceManagerFactory; import javax.jdo.Query; import javax.jdo.Transaction; import javax.jdo.datastore.DataStoreCache; +import javax.jdo.datastore.JDOConnection; import javax.jdo.identity.IntIdentity; import com.google.common.collect.Maps; @@ -220,6 +223,7 @@ public class ObjectStore implements RawStore, Configurable { private boolean isInitialized = false; private PersistenceManager pm = null; private MetaStoreDirectSql directSql = null; + private DatabaseProduct dbType = null; private PartitionExpressionProxy expressionProxy = null; private Configuration hiveConf; private volatile int openTrasactionCalls = 0; @@ -329,15 +333,37 @@ public class ObjectStore implements RawStore, Configurable { pm = getPersistenceManager(); isInitialized = pm != null; if (isInitialized) { + dbType = determineDatabaseProduct(); expressionProxy = createExpressionProxy(hiveConf); if (HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL)) { - directSql = new MetaStoreDirectSql(pm, hiveConf); + directSql = new MetaStoreDirectSql(pm, hiveConf, dbType); } } LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm + " created in the thread with id: " + Thread.currentThread().getId()); } + private DatabaseProduct determineDatabaseProduct() { + try { + return DatabaseProduct.determineDatabaseProduct(getProductName(pm)); + } catch (SQLException e) { + LOG.warn("Cannot determine database product; assuming OTHER", e); + return DatabaseProduct.OTHER; + } + } + + private static String getProductName(PersistenceManager pm) { + JDOConnection jdoConn = pm.getDataStoreConnection(); + try { + return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName(); + } catch (Throwable t) { + LOG.warn("Error retrieving product name", t); + return null; + } finally { + jdoConn.close(); // We must release the connection before we call other pm methods. + } + } + /** * Creates the proxy used to evaluate expressions. This is here to prevent circular * dependency - ql -> metastore client <-> metastore server -> ql. If server and @@ -511,15 +537,52 @@ public class ObjectStore implements RawStore, Configurable { return result; } - /** - * if this is the commit of the first open call then an actual commit is - * called. - * - * @return Always returns true - */ @Override @SuppressWarnings("nls") public boolean commitTransaction() { + if (!startCommitTransaction()) return false; + + openTrasactionCalls--; + debugLog("Commit transaction: count = " + openTrasactionCalls + ", isactive "+ currentTransaction.isActive()); + if ((openTrasactionCalls == 0) && currentTransaction.isActive()) { + transactionStatus = TXN_STATUS.COMMITED; + currentTransaction.commit(); + } + + return true; + } + + @Override + @CanNotRetry + public Boolean commitTransactionExpectDeadlock() { + if (!startCommitTransaction()) return false; + + if (--openTrasactionCalls != 0) { + String msg = "commitTransactionExpectDeadlock cannot be called for a nested transaction"; + LOG.error(msg); + throw new AssertionError(msg); + } + + transactionStatus = TXN_STATUS.COMMITED; + try { + currentTransaction.commit(); + } catch (Exception ex) { + Throwable candidate = ex; + while (candidate != null && !(candidate instanceof SQLException)) { + candidate = candidate.getCause(); + } + if (candidate == null) throw ex; + if (DatabaseProduct.isDeadlock(dbType, (SQLException)candidate)) { + LOG.info("Deadlock exception during commit: " + candidate.getMessage()); + return null; + } + throw ex; + } + + return true; + } + + private boolean startCommitTransaction() { if (TXN_STATUS.ROLLBACK == transactionStatus) { debugLog("Commit transaction: rollback"); return false; @@ -538,14 +601,6 @@ public class ObjectStore implements RawStore, Configurable { LOG.error("Unbalanced calls to open/commit Transaction", e); throw e; } - openTrasactionCalls--; - debugLog("Commit transaction: count = " + openTrasactionCalls + ", isactive "+ currentTransaction.isActive()); - - if ((openTrasactionCalls == 0) && currentTransaction.isActive()) { - transactionStatus = TXN_STATUS.COMMITED; - currentTransaction.commit(); - } - return true; } @@ -1487,7 +1542,7 @@ public class ObjectStore implements RawStore, Configurable { .getCreateTime(), tbl.getLastAccessTime(), tbl.getRetention(), convertToMFieldSchemas(tbl.getPartitionKeys()), tbl.getParameters(), tbl.getViewOriginalText(), tbl.getViewExpandedText(), - tableType, tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : -1, + tableType, tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0, tbl.isSetMmWatermarkWriteId() ? tbl.getMmWatermarkWriteId() : -1); } @@ -2718,7 +2773,8 @@ public class ObjectStore implements RawStore, Configurable { boolean isConfigEnabled = HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL) && (HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL_DDL) || !isInTxn); if (isConfigEnabled && directSql == null) { - directSql = new MetaStoreDirectSql(pm, getConf()); + dbType = determineDatabaseProduct(); + directSql = new MetaStoreDirectSql(pm, getConf(), dbType); } if (!allowJdo && isConfigEnabled && !directSql.isCompatibleDatastore()) { @@ -8692,16 +8748,10 @@ public class ObjectStore implements RawStore, Configurable { Query query = null; try { openTransaction(); - dbName = HiveStringUtils.normalizeIdentifier(dbName); - tblName = HiveStringUtils.normalizeIdentifier(tblName); - MTable mtbl = getMTable(dbName, tblName); - if (mtbl == null) { - success = true; - return null; - } query = pm.newQuery(MTableWrite.class, "table.tableName == t1 && table.database.name == t2 && writeId == t3"); query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.Long t3"); + @SuppressWarnings("unchecked") List<MTableWrite> writes = (List<MTableWrite>) query.execute(tblName, dbName, writeId); pm.retrieveAll(writes); success = true; @@ -8723,4 +8773,34 @@ public class ObjectStore implements RawStore, Configurable { } } + @Override + public List<Long> getWriteIds(String dbName, String tblName, + long watermarkId, long nextWriteId, char state) throws MetaException { + boolean success = false; + Query query = null; + try { + openTransaction(); + query = pm.newQuery("select writeId from org.apache.hadoop.hive.metastore.model.MTableWrite" + + " where table.tableName == t1 && table.database.name == t2 && writeId >= t3" + + " && writeId < t4 && state == t5"); + query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.Long t3, " + + "java.lang.Long t4, java.lang.String t5"); + query.setResult("writeId"); + query.setOrdering("writeId asc"); + @SuppressWarnings("unchecked") + List<Long> writes = (List<Long>) query.executeWithArray( + tblName, dbName, watermarkId, nextWriteId, String.valueOf(state)); + success = true; + return (writes == null || writes.isEmpty()) ? null : new ArrayList<>(writes); + } finally { + if (success) { + commitTransaction(); + } else { + rollbackTransaction(); + } + if (query != null) { + query.closeAll(); + } + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index c5359cf..170c07d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -94,6 +94,15 @@ public interface RawStore extends Configurable { public abstract boolean commitTransaction(); /** + * Commits transaction and detects if the failure to do so is a deadlock or not. + * Must be called on the top level with regard to openTransaction calls; attempting to + * call this after several nested openTransaction calls will throw. + * @return true or false - same as commitTransaction; null in case of deadlock. + */ + @CanNotRetry + public abstract Boolean commitTransactionExpectDeadlock(); + + /** * Rolls back the current transaction if it is active */ @CanNotRetry @@ -687,4 +696,6 @@ public interface RawStore extends Configurable { MTableWrite getTableWrite(String dbName, String tblName, long writeId) throws MetaException; void createTableWrite(Table tbl, long writeId, char state, long heartbeat); + + List<Long> getWriteIds(String dbName, String tblName, long watermarkId, long nextWriteId, char state) throws MetaException; } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 4fbeb9e..829f0ae 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.PartFilterExprUtil; import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.RawStore.CanNotRetry; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -131,13 +132,27 @@ public class HBaseStore implements RawStore { @Override public boolean commitTransaction() { if (--txnNestLevel == 0) { - LOG.debug("Committing HBase transaction"); - getHBase().commit(); + commitInternal(); } return true; } @Override + @CanNotRetry + public Boolean commitTransactionExpectDeadlock() { + if (--txnNestLevel != 0) { + throw new AssertionError("Cannot be called on a nested transaction"); + } + commitInternal(); + return true; + } + + private void commitInternal() { + LOG.debug("Committing HBase transaction"); + getHBase().commit(); + } + + @Override public void rollbackTransaction() { txnNestLevel = 0; LOG.debug("Rolling back HBase transaction"); @@ -2741,4 +2756,12 @@ public class HBaseStore implements RawStore { // TODO: Auto-generated method stub throw new UnsupportedOperationException(); } + + + @Override + public List<Long> getWriteIds( + String dbName, String tblName, long watermarkId, long nextWriteId, char state) { + // TODO: Auto-generated method stub + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/model/package.jdo ---------------------------------------------------------------------- diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo index 5126556..bd71056 100644 --- a/metastore/src/model/package.jdo +++ b/metastore/src/model/package.jdo @@ -53,7 +53,7 @@ <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/> </value> </field> - <field name="ownerName"> + <field name="ownerName"> <column name="OWNER_NAME" length="128" jdbc-type="VARCHAR" allows-null="true"/> </field> <field name="ownerType"> @@ -183,10 +183,10 @@ <column name="TBL_TYPE" length="128" jdbc-type="VARCHAR"/> </field> <field name="mmNextWriteId"> - <column name="MM_NEXT_WRITE_ID" jdbc-type="BIGINT"/> + <column name="MM_NEXT_WRITE_ID" jdbc-type="BIGINT" default-value="0" /> </field> <field name="mmWatermarkWriteId"> - <column name="MM_WATERMARK_WRITE_ID" jdbc-type="BIGINT"/> + <column name="MM_WATERMARK_WRITE_ID" jdbc-type="BIGINT" default-value="-1" /> </field> </class> @@ -210,7 +210,7 @@ <column name="PARENT_CD_ID"/> </field> <field name="parentIntegerIndex"> - <column name="PARENT_INTEGER_IDX"/> + <column name="PARENT_INTEGER_IDX"/> </field> <field name="parentTable"> <column name="PARENT_TBL_ID"/> @@ -219,7 +219,7 @@ <column name="CONSTRAINT_TYPE"/> </field> <field name="deleteRule"> - <column name="DELETE_RULE"/> + <column name="DELETE_RULE"/> </field> <field name="updateRule"> <column name="UPDATE_RULE"/> @@ -288,7 +288,7 @@ </embedded> </element> </field> - </class> + </class> <class name="MStringList" identity-type="datastore" table="Skewed_STRING_LIST" detachable="true"> <datastore-identity> @@ -308,7 +308,7 @@ <column name="SD_ID"/> </datastore-identity> <field name="cd"> - <column name="CD_ID"/> + <column name="CD_ID"/> </field> <field name="location"> <column name="LOCATION" length="4000" jdbc-type="VARCHAR"/> @@ -1003,7 +1003,7 @@ <field name="className"> <column name="CLASS_NAME" length="4000" jdbc-type="VARCHAR"/> </field> - <field name="ownerName"> + <field name="ownerName"> <column name="OWNER_NAME" length="128" jdbc-type="VARCHAR"/> </field> <field name="ownerType"> http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 9fffd3f..98c543f 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -878,4 +878,16 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable { public MTableWrite getTableWrite(String dbName, String tblName, long writeId) { return null; } + + @Override + @CanNotRetry + public Boolean commitTransactionExpectDeadlock() { + return null; + } + + @Override + public List<Long> getWriteIds( + String dbName, String tblName, long watermarkId, long nextWriteId, char state) { + return null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index a763085..8e54b16 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -104,14 +104,17 @@ public class DummyRawStoreForJdoConnection implements RawStore { @Override public boolean commitTransaction() { + return false; + } + @Override + @CanNotRetry + public Boolean commitTransactionExpectDeadlock() { return false; } @Override public void rollbackTransaction() { - - } @Override @@ -893,6 +896,12 @@ public class DummyRawStoreForJdoConnection implements RawStore { public MTableWrite getTableWrite(String dbName, String tblName, long writeId) { return null; } + + @Override + public List<Long> getWriteIds( + String dbName, String tblName, long watermarkId, long nextWriteId, char state) { + return null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 42d398d..45a80e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -39,6 +39,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; @@ -46,8 +47,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.VariableSubstitution; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -71,6 +75,7 @@ import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContext; import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContextImpl; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; @@ -1416,6 +1421,11 @@ public class Driver implements CommandProcessor { return rollback(createProcessorResponse(ret)); } } + try { + acquireWriteIds(plan, conf); + } catch (HiveException e) { + return handleHiveException(e, 1); + } ret = execute(); if (ret != 0) { //if needRequireLock is false, the release here will do nothing because there is no lock @@ -1458,6 +1468,34 @@ public class Driver implements CommandProcessor { return createProcessorResponse(ret); } + private static void acquireWriteIds(QueryPlan plan, HiveConf conf) throws HiveException { + // Output IDs are put directly into FileSinkDesc; here, we only need to take care of inputs. + for (ReadEntity input : plan.getInputs()) { + Table t = extractMmTable(input); + if (t == null) continue; + ValidWriteIds ids = Hive.get().getValidWriteIdsForTable(t.getDbName(), t.getTableName()); + ids.addToConf(conf, t.getDbName(), t.getTableName()); + if (plan.getFetchTask() != null) { + ids.addToConf(plan.getFetchTask().getFetchConf(), t.getDbName(), t.getTableName()); + } + } + } + + private static Table extractMmTable(ReadEntity input) { + Table t = null; + switch (input.getType()) { + case TABLE: + t = input.getTable(); + break; + case DUMMYPARTITION: + case PARTITION: + t = input.getPartition().getTable(); + break; + default: return null; + } + return (t != null && !t.isTemporary() && AcidUtils.isMmTable(t)) ? t : null; + } + private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { //console.printError(cpr.toString()); try { http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 601ad08..7375cd4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -30,6 +30,7 @@ import java.util.Properties; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -695,4 +696,8 @@ public class FetchOperator implements Serializable { return inputFormat.getRecordReader(getInputSplit(), job, Reporter.NULL); } } + + public Configuration getJobConf() { + return job; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 8c7d99d..93c03fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -24,6 +24,7 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -193,4 +194,8 @@ public class FetchTask extends Task<FetchWork> implements Serializable { } } + public Configuration getFetchConf() { + return fetch.getJobConf(); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 6a0143a..e4e0153 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -40,10 +40,10 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -239,7 +239,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } if (isMmTable) { - Path manifestPath = new Path(specPath, "_tmp." + getMmPrefixedTaskId() + MANIFEST_EXTENSION); + Path manifestPath = new Path(specPath, "_tmp." + ValidWriteIds.getMmFilePrefix( + conf.getMmWriteId()) + "_" + taskId + MANIFEST_EXTENSION); Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths); try { try (FSDataOutputStream out = fs.create(manifestPath)) { @@ -323,11 +324,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } outPaths[filesIdx] = getTaskOutPath(taskId); } else { + String subdirPath = ValidWriteIds.getMmFilePrefix(conf.getMmWriteId()) + "/" + taskId; if (!bDynParts && !isSkewedStoredAsSubDirectories) { - finalPaths[filesIdx] = getFinalPath(getMmPrefixedTaskId(), specPath, extension); + finalPaths[filesIdx] = getFinalPath(subdirPath, specPath, extension); } else { - // TODO# wrong! - finalPaths[filesIdx] = getFinalPath(getMmPrefixedTaskId(), specPath, extension); + // TODO# wrong! special case #N bucketing + finalPaths[filesIdx] = getFinalPath(subdirPath, specPath, extension); } outPaths[filesIdx] = finalPaths[filesIdx]; } @@ -721,10 +723,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } - private String getMmPrefixedTaskId() { - return AcidUtils.getMmFilePrefix(conf.getMmWriteId()) + taskId; - } - protected Writable recordValue; @@ -1195,21 +1193,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements super.jobCloseOp(hconf, success); } - private static class ExecPrefixPathFilter implements PathFilter { - private final String prefix, tmpPrefix; - public ExecPrefixPathFilter(String prefix) { - this.prefix = prefix; - this.tmpPrefix = "_tmp." + prefix; - } - - @Override - public boolean accept(Path path) { - String name = path.getName(); - return name.startsWith(prefix) || name.startsWith(tmpPrefix); - } - } - - private void handleMmTable(Path specPath, Configuration hconf, boolean success, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter) throws IOException, HiveException { @@ -1217,7 +1200,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements int targetLevel = (dpCtx == null) ? 1 : dpCtx.getNumDPCols(); if (!success) { FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(specPath, targetLevel, fs, - new ExecPrefixPathFilter(AcidUtils.getMmFilePrefix(conf.getMmWriteId()))); + new ValidWriteIds.IdPathFilter(conf.getMmWriteId(), true)); for (FileStatus status : statuses) { Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure"); tryDelete(fs, status.getPath()); @@ -1225,15 +1208,19 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements return; } FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(specPath, targetLevel, fs, - new ExecPrefixPathFilter(AcidUtils.getMmFilePrefix(conf.getMmWriteId()))); + new ValidWriteIds.IdPathFilter(conf.getMmWriteId(), true)); if (statuses == null) return; LinkedList<FileStatus> results = new LinkedList<>(); List<Path> manifests = new ArrayList<>(statuses.length); for (FileStatus status : statuses) { if (status.getPath().getName().endsWith(MANIFEST_EXTENSION)) { manifests.add(status.getPath()); + } else if (!status.isDirectory()) { + Path path = status.getPath(); + Utilities.LOG14535.warn("Unknown file found - neither a manifest nor directory: " + path); + tryDelete(fs, path); } else { - results.add(status); + results.addAll(Lists.newArrayList(fs.listStatus(status.getPath()))); } } HashSet<String> committed = new HashSet<>(); @@ -1254,7 +1241,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements if (!committed.remove(rfs.getPath().toString())) { iter.remove(); Utilities.LOG14535.info("Deleting " + rfs.getPath() + " that was not committed"); - tryDelete(fs, rfs.getPath()); + // We should actually succeed here - if we fail, don't commit the query. + if (!fs.delete(rfs.getPath(), true)) { + throw new HiveException("Failed to delete an uncommitted path " + rfs.getPath()); + } } } if (!committed.isEmpty()) { @@ -1268,6 +1258,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements if (results.isEmpty()) return; FileStatus[] finalResults = results.toArray(new FileStatus[results.size()]); + // TODO# dp will break - removeTempOrDuplicateFiles assumes dirs in results. Why? We recurse... List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles( fs, finalResults, dpCtx, conf, hconf); // create empty buckets if necessary @@ -1278,7 +1269,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements private void tryDelete(FileSystem fs, Path path) { try { - fs.delete(path, false); + fs.delete(path, true); } catch (IOException ex) { LOG.error("Failed to delete " + path, ex); } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index f2389ea..3be21c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -314,17 +314,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable { checkFileFormats(db, tbd, table); boolean isAcid = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID; - if (tbd.isMmTable()) { - if (tbd.getReplace()) { - // TODO#: would need a list of new files to support. Then, old ones only would need - // to be removed from MS (and FS). Also, per-partition IOW is problematic for - // the prefix case. - throw new HiveException("Replace and MM are not supported"); - } - if (isAcid) { - // TODO# need to make sure ACID writes to final directories. Otherwise, might need to move. - throw new HiveException("ACID and MM are not supported"); - } + if (tbd.isMmTable() && isAcid) { + // TODO# need to make sure ACID writes to final directories. Otherwise, might need to move. + throw new HiveException("ACID and MM are not supported"); } // Create a data container http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 9e6a201..03abdc1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -123,6 +123,7 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; @@ -161,6 +162,7 @@ import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.Serializer; @@ -192,6 +194,7 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Shell; import org.apache.hive.common.util.ReflectionUtil; @@ -199,6 +202,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.esotericsoftware.kryo.Kryo; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -651,6 +655,7 @@ public final class Utilities { } } + @VisibleForTesting public static TableDesc defaultTd; static { // by default we expect ^A separated strings @@ -658,7 +663,16 @@ public final class Utilities { // PlanUtils.getDefaultTableDesc(String separatorCode, String columns) // or getBinarySortableTableDesc(List<FieldSchema> fieldSchemas) when // we know the column names. - defaultTd = PlanUtils.getDefaultTableDesc("" + Utilities.ctrlaCode); + /** + * Generate the table descriptor of MetadataTypedColumnsetSerDe with the + * separatorCode. MetaDataTypedColumnsetSerDe is used because LazySimpleSerDe + * does not support a table with a single column "col" with type + * "array<string>". + */ + defaultTd = new TableDesc(TextInputFormat.class, IgnoreKeyTextOutputFormat.class, + Utilities.makeProperties(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, + "" + Utilities.ctrlaCode, serdeConstants.SERIALIZATION_LIB, + MetadataTypedColumnsetSerDe.class.getName())); } public static final int carriageReturnCode = 13; @@ -1528,14 +1542,9 @@ public final class Utilities { // get the missing buckets and generate empty buckets String taskID1 = taskIDToFile.keySet().iterator().next(); Path bucketPath = taskIDToFile.values().iterator().next().getPath(); + Utilities.LOG14535.info("Bucket path " + bucketPath); for (int j = 0; j < dpCtx.getNumBuckets(); ++j) { - String taskID2 = replaceTaskId(taskID1, j); - if (!taskIDToFile.containsKey(taskID2)) { - // create empty bucket, file name should be derived from taskID2 - URI bucketUri = bucketPath.toUri(); - String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j); - result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2)); - } + addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, j); } } } @@ -1550,14 +1559,9 @@ public final class Utilities { // get the missing buckets and generate empty buckets for non-dynamic partition String taskID1 = taskIDToFile.keySet().iterator().next(); Path bucketPath = taskIDToFile.values().iterator().next().getPath(); + Utilities.LOG14535.info("Bucket path " + bucketPath); for (int j = 0; j < conf.getTable().getNumBuckets(); ++j) { - String taskID2 = replaceTaskId(taskID1, j); - if (!taskIDToFile.containsKey(taskID2)) { - // create empty bucket, file name should be derived from taskID2 - URI bucketUri = bucketPath.toUri(); - String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j); - result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2)); - } + addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, j); } } } @@ -1565,6 +1569,19 @@ public final class Utilities { return result; } + private static void addBucketFileIfMissing(List<Path> result, + HashMap<String, FileStatus> taskIDToFile, String taskID1, Path bucketPath, int j) { + // TODO# this will probably break with directories cause buckets would be above (or not?) + String taskID2 = replaceTaskId(taskID1, j); + if (!taskIDToFile.containsKey(taskID2)) { + // create empty bucket, file name should be derived from taskID2 + URI bucketUri = bucketPath.toUri(); + String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j); + Utilities.LOG14535.info("Creating an empty bucket file " + path2); + result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2)); + } + } + public static HashMap<String, FileStatus> removeTempOrDuplicateFiles(FileStatus[] items, FileSystem fs) throws IOException { @@ -2976,8 +2993,9 @@ public final class Utilities { // The alias may not have any path Path path = null; - for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) { - List<String> aliases = work.getPathToAliases().get(file); + for (Map.Entry<Path, ArrayList<String>> e : work.getPathToAliases().entrySet()) { + Path file = e.getKey(); + List<String> aliases = e.getValue(); if (aliases.contains(alias)) { path = file; http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 1ef15cb..70b129e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1167,8 +1167,4 @@ public class AcidUtils { } return AcidOperationalProperties.parseString(resultStr); } - - public static String getMmFilePrefix(long mmWriteId) { - return "mm_" + mmWriteId + "_"; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index c4b9940..0510e08 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -23,9 +23,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import java.io.DataInput; import java.io.DataOutput; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,8 +41,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; @@ -345,7 +349,10 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> */ private void addSplitsForGroup(List<Path> dirs, TableScanOperator tableScan, JobConf conf, InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits, - TableDesc table, List<InputSplit> result) throws IOException { + TableDesc table, Map<String, ValidWriteIds> writeIdMap, List<InputSplit> result) + throws IOException { + ValidWriteIds writeIds = extractWriteIds(writeIdMap, conf, table.getTableName()); + Utilities.LOG14535.info("Observing " + table.getTableName() + ": " + writeIds); Utilities.copyTablePropertiesToConf(table, conf); @@ -353,7 +360,19 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> pushFilters(conf, tableScan); } - FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()])); + if (writeIds == null) { + FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()])); + } else { + List<Path> finalPaths = new ArrayList<>(dirs.size()); + for (Path dir : dirs) { + processForWriteIds(dir, conf, writeIds, finalPaths); + } + if (finalPaths.isEmpty()) { + LOG.warn("No valid inputs found in " + dirs); + return; + } + FileInputFormat.setInputPaths(conf, finalPaths.toArray(new Path[finalPaths.size()])); + } conf.setInputFormat(inputFormat.getClass()); int headerCount = 0; @@ -373,6 +392,24 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } } + private void processForWriteIds(Path dir, JobConf conf, + ValidWriteIds writeIds, List<Path> finalPaths) throws IOException { + FileStatus[] files = dir.getFileSystem(conf).listStatus(dir); // TODO: batch? + for (FileStatus file : files) { + Path subdir = file.getPath(); + if (!file.isDirectory()) { + Utilities.LOG14535.warn("Found a file not in subdirectory " + subdir); + continue; + } + if (!writeIds.isValidInput(subdir)) { + Utilities.LOG14535.warn("Ignoring an uncommitted directory " + subdir); + continue; + } + Utilities.LOG14535.info("Adding input " + subdir); + finalPaths.add(subdir); + } + } + Path[] getInputPaths(JobConf job) throws IOException { Path[] dirs; if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { @@ -416,6 +453,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> StringBuilder readColumnNamesBuffer = new StringBuilder(newjob. get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "")); // for each dir, get the InputFormat, and do getSplits. + Map<String, ValidWriteIds> writeIdMap = new HashMap<>(); for (Path dir : dirs) { PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass(); @@ -466,7 +504,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> addSplitsForGroup(currentDirs, currentTableScan, newjob, getInputFormatFromCache(currentInputFormatClass, job), currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length), - currentTable, result); + currentTable, writeIdMap, result); } currentDirs.clear(); @@ -488,7 +526,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> addSplitsForGroup(currentDirs, currentTableScan, newjob, getInputFormatFromCache(currentInputFormatClass, job), currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length), - currentTable, result); + currentTable, writeIdMap, result); } Utilities.clearWorkMapForConf(job); @@ -499,6 +537,19 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> return result.toArray(new HiveInputSplit[result.size()]); } + private static ValidWriteIds extractWriteIds(Map<String, ValidWriteIds> writeIdMap, + JobConf newjob, String tableName) { + if (StringUtils.isBlank(tableName)) return null; + ValidWriteIds writeIds = writeIdMap.get(tableName); + if (writeIds == null) { + writeIds = ValidWriteIds.createFromConf(newjob, tableName); + writeIdMap.put(tableName, writeIds != null ? writeIds : ValidWriteIds.NO_WRITE_IDS); + } else if (writeIds == ValidWriteIds.NO_WRITE_IDS) { + writeIds = null; + } + return writeIds; + } + private void pushProjection(final JobConf newjob, final StringBuilder readColumnsBuffer, final StringBuilder readColumnNamesBuffer) { String readColIds = readColumnsBuffer.toString(); http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 2ba4fa2..f3609df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable; import org.apache.hadoop.hive.conf.HiveConf; @@ -96,6 +97,7 @@ import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResult; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.HiveObjectType; @@ -1563,6 +1565,11 @@ public class Hive { if (areEventsForDmlNeeded(tbl, oldPart)) { newFiles = listFilesCreatedByQuery(loadPath, mmWriteId); } + if (replace) { + Path tableDest = tbl.getPath(); + deleteOldPathForReplace(newPartPath, oldPartPath, + getConf(), new ValidWriteIds.IdPathFilter(mmWriteId, false)); + } } else { if (replace || (oldPart == null && !isAcid)) { replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), @@ -1652,7 +1659,7 @@ public class Hive { private List<Path> listFilesCreatedByQuery(Path loadPath, long mmWriteId) throws HiveException { List<Path> newFiles = new ArrayList<Path>(); - final String filePrefix = AcidUtils.getMmFilePrefix(mmWriteId); + final String filePrefix = ValidWriteIds.getMmFilePrefix(mmWriteId); FileStatus[] srcs; FileSystem srcFs; try { @@ -1920,7 +1927,7 @@ private void constructOneLBLocationMap(FileStatus fSta, for (Future future : futures) { future.get(); } - // TODO# we would commit the txn to metastore here + // TODO# special case #N - DP - we would commit the txn to metastore here } catch (InterruptedException | ExecutionException e) { LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); //cancel other futures @@ -1993,6 +2000,11 @@ private void constructOneLBLocationMap(FileStatus fSta, } } } else { + if (replace) { + Path tableDest = tbl.getPath(); + deleteOldPathForReplace(tableDest, tableDest, sessionConf, + new ValidWriteIds.IdPathFilter(mmWriteId, false)); + } newFiles = listFilesCreatedByQuery(loadPath, mmWriteId); } if (!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { @@ -3376,39 +3388,10 @@ private void constructOneLBLocationMap(FileStatus fSta, } if (oldPath != null) { - boolean oldPathDeleted = false; - boolean isOldPathUnderDestf = false; - FileStatus[] statuses = null; - try { - FileSystem oldFs = oldPath.getFileSystem(conf); - statuses = oldFs.listStatus(oldPath, FileUtils.HIDDEN_FILES_PATH_FILTER); - // Do not delete oldPath if: - // - destf is subdir of oldPath - isOldPathUnderDestf = isSubDir(oldPath, destf, oldFs, destFs, false); - if (isOldPathUnderDestf) { - // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its - // existing content might result in incorrect (extra) data. - // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is - // not the destf or its subdir? - oldPathDeleted = trashFiles(oldFs, statuses, conf); - } - } catch (IOException e) { - if (isOldPathUnderDestf) { - // if oldPath is a subdir of destf but it could not be cleaned - throw new HiveException("Directory " + oldPath.toString() - + " could not be cleaned up.", e); - } else { - //swallow the exception since it won't affect the final result - LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e); - } - } - if (statuses != null && statuses.length > 0) { - if (isOldPathUnderDestf && !oldPathDeleted) { - throw new HiveException("Destination directory " + destf + " has not be cleaned up."); - } - } + deleteOldPathForReplace(destf, oldPath, conf, FileUtils.HIDDEN_FILES_PATH_FILTER); } + // TODO# what are the paths that use this? MM tables will need to do this beforehand // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates // destf with inherited permissions boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars @@ -3442,6 +3425,37 @@ private void constructOneLBLocationMap(FileStatus fSta, } + private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, + PathFilter pathFilter) throws HiveException { + boolean isOldPathUnderDestf = false; + try { + FileSystem oldFs = oldPath.getFileSystem(conf); + FileSystem destFs = destPath.getFileSystem(conf); + // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its + // existing content might result in incorrect (extra) data. + // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is + // not the destf or its subdir? + isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false); + if (isOldPathUnderDestf) { + FileStatus[] statuses = oldFs.listStatus(oldPath, pathFilter); + if (statuses != null && statuses.length > 0 && !trashFiles(oldFs, statuses, conf)) { + throw new HiveException("Destination directory " + destPath + + " has not been cleaned up."); + } + } + } catch (IOException e) { + if (isOldPathUnderDestf) { + // if oldPath is a subdir of destf but it could not be cleaned + throw new HiveException("Directory " + oldPath.toString() + + " could not be cleaned up.", e); + } else { + //swallow the exception since it won't affect the final result + LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e); + } + } + } + + /** * Trashes or deletes all files under a directory. Leaves the directory as is. * @param fs FileSystem to use @@ -4007,7 +4021,6 @@ private void constructOneLBLocationMap(FileStatus fSta, } } - public long getNextTableWriteId(String dbName, String tableName) throws HiveException { try { return getMSC().getNextTableWriteId(dbName, tableName); @@ -4015,4 +4028,17 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new HiveException(e); } } + + public ValidWriteIds getValidWriteIdsForTable( + String dbName, String tableName) throws HiveException { + try { + // TODO: decode ID ranges here if we use that optimization + GetValidWriteIdsResult result = getMSC().getValidWriteIds(dbName, tableName); + return new ValidWriteIds(result.getLowWatermarkId(), result.getHighWatermarkId(), + result.isSetAreIdsValid() && result.isAreIdsValid(), + result.isSetIds() ? new HashSet<Long>(result.getIds()) : null); + } catch (Exception e) { + throw new HiveException(e); + } + } }; http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index bb7001a..675bfd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1808,7 +1808,7 @@ public final class GenMapRedUtils { // Create the required temporary file in the HDFS location if the destination // path of the FileSinkOperator table is a blobstore path. - // TODO# HERE + // TODO# special case #N - linked FDs (unions?) Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath()); // Change all the linked file sink descriptors http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java index e2887fd..ee67443 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java @@ -71,7 +71,6 @@ public class AnnotateRunTimeStatsOptimizer implements PhysicalPlanResolver { Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd; Set<Operator<? extends OperatorDesc>> ops = new HashSet<>(); - /* TODO# wtf if (currTask instanceof MapRedTask) { MapRedTask mr = (MapRedTask) currTask; ops.addAll(mr.getWork().getAllOperators()); @@ -85,7 +84,7 @@ public class AnnotateRunTimeStatsOptimizer implements PhysicalPlanResolver { for (BaseWork w : sparkWork.getAllWork()) { ops.addAll(w.getAllOperators()); } - }*/ + } setOrAnnotateStats(ops, physicalContext.getParseContext()); return null; http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 422be8e..93fe0e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -206,7 +206,7 @@ public abstract class TaskCompiler { } } else if (!isCStats) { for (LoadTableDesc ltd : loadTableWork) { - // TODO# HERE + // TODO# move task is created here; handle MM tables Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 5cc3663..1be4d84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.JobConf; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Interner; /** @@ -375,6 +376,7 @@ public class MapWork extends BaseWork { } @SuppressWarnings("nls") + @VisibleForTesting public void addMapWork(Path path, String alias, Operator<?> work, PartitionDesc pd) { ArrayList<String> curAliases = pathToAliases.get(path); http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 5dc3aa6..f055cde 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -381,20 +381,6 @@ public final class PlanUtils { } /** - * Generate the table descriptor of MetadataTypedColumnsetSerDe with the - * separatorCode. MetaDataTypedColumnsetSerDe is used because LazySimpleSerDe - * does not support a table with a single column "col" with type - * "array<string>". - */ - public static TableDesc getDefaultTableDesc(String separatorCode) { - return new TableDesc( - TextInputFormat.class, IgnoreKeyTextOutputFormat.class, Utilities - .makeProperties( - org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT,separatorCode, - serdeConstants.SERIALIZATION_LIB,MetadataTypedColumnsetSerDe.class.getName())); - } - - /** * Generate the table descriptor for reduce key. */ public static TableDesc getReduceKeyTableDesc(List<FieldSchema> fieldSchemas,
