http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php b/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php index 5e3dff1..7a8a42a 100644 --- a/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php +++ b/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php @@ -1514,6 +1514,20 @@ interface ThriftHiveMetastoreIf extends \FacebookServiceIf { * @throws \metastore\MetaException */ public function get_serde(\metastore\GetSerdeRequest $rqst); + /** + * @param string $dbName + * @param string $tableName + * @param int $txnId + * @return \metastore\LockResponse + */ + public function get_lock_materialization_rebuild($dbName, $tableName, $txnId); + /** + * @param string $dbName + * @param string $tableName + * @param int $txnId + * @return bool + */ + public function heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId); } class ThriftHiveMetastoreClient extends \FacebookServiceClient implements \metastore\ThriftHiveMetastoreIf { @@ -12887,6 +12901,112 @@ class ThriftHiveMetastoreClient extends \FacebookServiceClient implements \metas throw new \Exception("get_serde failed: unknown result"); } + public function get_lock_materialization_rebuild($dbName, $tableName, $txnId) + { + $this->send_get_lock_materialization_rebuild($dbName, $tableName, $txnId); + return $this->recv_get_lock_materialization_rebuild(); + } + + public function send_get_lock_materialization_rebuild($dbName, $tableName, $txnId) + { + $args = new \metastore\ThriftHiveMetastore_get_lock_materialization_rebuild_args(); + $args->dbName = $dbName; + $args->tableName = $tableName; + $args->txnId = $txnId; + $bin_accel = ($this->output_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary'); + if ($bin_accel) + { + thrift_protocol_write_binary($this->output_, 'get_lock_materialization_rebuild', TMessageType::CALL, $args, $this->seqid_, $this->output_->isStrictWrite()); + } + else + { + $this->output_->writeMessageBegin('get_lock_materialization_rebuild', TMessageType::CALL, $this->seqid_); + $args->write($this->output_); + $this->output_->writeMessageEnd(); + $this->output_->getTransport()->flush(); + } + } + + public function recv_get_lock_materialization_rebuild() + { + $bin_accel = ($this->input_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_read_binary'); + if ($bin_accel) $result = thrift_protocol_read_binary($this->input_, '\metastore\ThriftHiveMetastore_get_lock_materialization_rebuild_result', $this->input_->isStrictRead()); + else + { + $rseqid = 0; + $fname = null; + $mtype = 0; + + $this->input_->readMessageBegin($fname, $mtype, $rseqid); + if ($mtype == TMessageType::EXCEPTION) { + $x = new TApplicationException(); + $x->read($this->input_); + $this->input_->readMessageEnd(); + throw $x; + } + $result = new \metastore\ThriftHiveMetastore_get_lock_materialization_rebuild_result(); + $result->read($this->input_); + $this->input_->readMessageEnd(); + } + if ($result->success !== null) { + return $result->success; + } + throw new \Exception("get_lock_materialization_rebuild failed: unknown result"); + } + + public function heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId) + { + $this->send_heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId); + return $this->recv_heartbeat_lock_materialization_rebuild(); + } + + public function send_heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId) + { + $args = new \metastore\ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args(); + $args->dbName = $dbName; + $args->tableName = $tableName; + $args->txnId = $txnId; + $bin_accel = ($this->output_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary'); + if ($bin_accel) + { + thrift_protocol_write_binary($this->output_, 'heartbeat_lock_materialization_rebuild', TMessageType::CALL, $args, $this->seqid_, $this->output_->isStrictWrite()); + } + else + { + $this->output_->writeMessageBegin('heartbeat_lock_materialization_rebuild', TMessageType::CALL, $this->seqid_); + $args->write($this->output_); + $this->output_->writeMessageEnd(); + $this->output_->getTransport()->flush(); + } + } + + public function recv_heartbeat_lock_materialization_rebuild() + { + $bin_accel = ($this->input_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_read_binary'); + if ($bin_accel) $result = thrift_protocol_read_binary($this->input_, '\metastore\ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result', $this->input_->isStrictRead()); + else + { + $rseqid = 0; + $fname = null; + $mtype = 0; + + $this->input_->readMessageBegin($fname, $mtype, $rseqid); + if ($mtype == TMessageType::EXCEPTION) { + $x = new TApplicationException(); + $x->read($this->input_); + $this->input_->readMessageEnd(); + throw $x; + } + $result = new \metastore\ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result(); + $result->read($this->input_); + $this->input_->readMessageEnd(); + } + if ($result->success !== null) { + return $result->success; + } + throw new \Exception("heartbeat_lock_materialization_rebuild failed: unknown result"); + } + } // HELPER FUNCTIONS AND STRUCTURES @@ -57981,4 +58101,401 @@ class ThriftHiveMetastore_get_serde_result { } +class ThriftHiveMetastore_get_lock_materialization_rebuild_args { + static $_TSPEC; + + /** + * @var string + */ + public $dbName = null; + /** + * @var string + */ + public $tableName = null; + /** + * @var int + */ + public $txnId = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'dbName', + 'type' => TType::STRING, + ), + 2 => array( + 'var' => 'tableName', + 'type' => TType::STRING, + ), + 3 => array( + 'var' => 'txnId', + 'type' => TType::I64, + ), + ); + } + if (is_array($vals)) { + if (isset($vals['dbName'])) { + $this->dbName = $vals['dbName']; + } + if (isset($vals['tableName'])) { + $this->tableName = $vals['tableName']; + } + if (isset($vals['txnId'])) { + $this->txnId = $vals['txnId']; + } + } + } + + public function getName() { + return 'ThriftHiveMetastore_get_lock_materialization_rebuild_args'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->dbName); + } else { + $xfer += $input->skip($ftype); + } + break; + case 2: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->tableName); + } else { + $xfer += $input->skip($ftype); + } + break; + case 3: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->txnId); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('ThriftHiveMetastore_get_lock_materialization_rebuild_args'); + if ($this->dbName !== null) { + $xfer += $output->writeFieldBegin('dbName', TType::STRING, 1); + $xfer += $output->writeString($this->dbName); + $xfer += $output->writeFieldEnd(); + } + if ($this->tableName !== null) { + $xfer += $output->writeFieldBegin('tableName', TType::STRING, 2); + $xfer += $output->writeString($this->tableName); + $xfer += $output->writeFieldEnd(); + } + if ($this->txnId !== null) { + $xfer += $output->writeFieldBegin('txnId', TType::I64, 3); + $xfer += $output->writeI64($this->txnId); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + +class ThriftHiveMetastore_get_lock_materialization_rebuild_result { + static $_TSPEC; + + /** + * @var \metastore\LockResponse + */ + public $success = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 0 => array( + 'var' => 'success', + 'type' => TType::STRUCT, + 'class' => '\metastore\LockResponse', + ), + ); + } + if (is_array($vals)) { + if (isset($vals['success'])) { + $this->success = $vals['success']; + } + } + } + + public function getName() { + return 'ThriftHiveMetastore_get_lock_materialization_rebuild_result'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 0: + if ($ftype == TType::STRUCT) { + $this->success = new \metastore\LockResponse(); + $xfer += $this->success->read($input); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('ThriftHiveMetastore_get_lock_materialization_rebuild_result'); + if ($this->success !== null) { + if (!is_object($this->success)) { + throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); + } + $xfer += $output->writeFieldBegin('success', TType::STRUCT, 0); + $xfer += $this->success->write($output); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + +class ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args { + static $_TSPEC; + + /** + * @var string + */ + public $dbName = null; + /** + * @var string + */ + public $tableName = null; + /** + * @var int + */ + public $txnId = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'dbName', + 'type' => TType::STRING, + ), + 2 => array( + 'var' => 'tableName', + 'type' => TType::STRING, + ), + 3 => array( + 'var' => 'txnId', + 'type' => TType::I64, + ), + ); + } + if (is_array($vals)) { + if (isset($vals['dbName'])) { + $this->dbName = $vals['dbName']; + } + if (isset($vals['tableName'])) { + $this->tableName = $vals['tableName']; + } + if (isset($vals['txnId'])) { + $this->txnId = $vals['txnId']; + } + } + } + + public function getName() { + return 'ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->dbName); + } else { + $xfer += $input->skip($ftype); + } + break; + case 2: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->tableName); + } else { + $xfer += $input->skip($ftype); + } + break; + case 3: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->txnId); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args'); + if ($this->dbName !== null) { + $xfer += $output->writeFieldBegin('dbName', TType::STRING, 1); + $xfer += $output->writeString($this->dbName); + $xfer += $output->writeFieldEnd(); + } + if ($this->tableName !== null) { + $xfer += $output->writeFieldBegin('tableName', TType::STRING, 2); + $xfer += $output->writeString($this->tableName); + $xfer += $output->writeFieldEnd(); + } + if ($this->txnId !== null) { + $xfer += $output->writeFieldBegin('txnId', TType::I64, 3); + $xfer += $output->writeI64($this->txnId); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + +class ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result { + static $_TSPEC; + + /** + * @var bool + */ + public $success = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 0 => array( + 'var' => 'success', + 'type' => TType::BOOL, + ), + ); + } + if (is_array($vals)) { + if (isset($vals['success'])) { + $this->success = $vals['success']; + } + } + } + + public function getName() { + return 'ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 0: + if ($ftype == TType::BOOL) { + $xfer += $input->readBool($this->success); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result'); + if ($this->success !== null) { + $xfer += $output->writeFieldBegin('success', TType::BOOL, 0); + $xfer += $output->writeBool($this->success); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} +
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php index d4fcc88..14416b4 100644 --- a/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php +++ b/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -23975,6 +23975,10 @@ class Materialization { * @var int */ public $invalidationTime = null; + /** + * @var bool + */ + public $sourceTablesUpdateDeleteModified = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -23995,6 +23999,10 @@ class Materialization { 'var' => 'invalidationTime', 'type' => TType::I64, ), + 4 => array( + 'var' => 'sourceTablesUpdateDeleteModified', + 'type' => TType::BOOL, + ), ); } if (is_array($vals)) { @@ -24007,6 +24015,9 @@ class Materialization { if (isset($vals['invalidationTime'])) { $this->invalidationTime = $vals['invalidationTime']; } + if (isset($vals['sourceTablesUpdateDeleteModified'])) { + $this->sourceTablesUpdateDeleteModified = $vals['sourceTablesUpdateDeleteModified']; + } } } @@ -24064,6 +24075,13 @@ class Materialization { $xfer += $input->skip($ftype); } break; + case 4: + if ($ftype == TType::BOOL) { + $xfer += $input->readBool($this->sourceTablesUpdateDeleteModified); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -24108,6 +24126,11 @@ class Materialization { $xfer += $output->writeI64($this->invalidationTime); $xfer += $output->writeFieldEnd(); } + if ($this->sourceTablesUpdateDeleteModified !== null) { + $xfer += $output->writeFieldBegin('sourceTablesUpdateDeleteModified', TType::BOOL, 4); + $xfer += $output->writeBool($this->sourceTablesUpdateDeleteModified); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote index d39690f..079c7fc 100755 --- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote +++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote @@ -224,6 +224,8 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help': print(' void set_schema_version_state(SetSchemaVersionStateRequest rqst)') print(' void add_serde(SerDeInfo serde)') print(' SerDeInfo get_serde(GetSerdeRequest rqst)') + print(' LockResponse get_lock_materialization_rebuild(string dbName, string tableName, i64 txnId)') + print(' bool heartbeat_lock_materialization_rebuild(string dbName, string tableName, i64 txnId)') print(' string getName()') print(' string getVersion()') print(' fb_status getStatus()') @@ -1493,6 +1495,18 @@ elif cmd == 'get_serde': sys.exit(1) pp.pprint(client.get_serde(eval(args[0]),)) +elif cmd == 'get_lock_materialization_rebuild': + if len(args) != 3: + print('get_lock_materialization_rebuild requires 3 args') + sys.exit(1) + pp.pprint(client.get_lock_materialization_rebuild(args[0],args[1],eval(args[2]),)) + +elif cmd == 'heartbeat_lock_materialization_rebuild': + if len(args) != 3: + print('heartbeat_lock_materialization_rebuild requires 3 args') + sys.exit(1) + pp.pprint(client.heartbeat_lock_materialization_rebuild(args[0],args[1],eval(args[2]),)) + elif cmd == 'getName': if len(args) != 0: print('getName requires 0 args') http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py index f8ffeac..b0e64d8 100644 --- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py +++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py @@ -1548,6 +1548,24 @@ class Iface(fb303.FacebookService.Iface): """ pass + def get_lock_materialization_rebuild(self, dbName, tableName, txnId): + """ + Parameters: + - dbName + - tableName + - txnId + """ + pass + + def heartbeat_lock_materialization_rebuild(self, dbName, tableName, txnId): + """ + Parameters: + - dbName + - tableName + - txnId + """ + pass + class Client(fb303.FacebookService.Client, Iface): """ @@ -8711,6 +8729,76 @@ class Client(fb303.FacebookService.Client, Iface): raise result.o2 raise TApplicationException(TApplicationException.MISSING_RESULT, "get_serde failed: unknown result") + def get_lock_materialization_rebuild(self, dbName, tableName, txnId): + """ + Parameters: + - dbName + - tableName + - txnId + """ + self.send_get_lock_materialization_rebuild(dbName, tableName, txnId) + return self.recv_get_lock_materialization_rebuild() + + def send_get_lock_materialization_rebuild(self, dbName, tableName, txnId): + self._oprot.writeMessageBegin('get_lock_materialization_rebuild', TMessageType.CALL, self._seqid) + args = get_lock_materialization_rebuild_args() + args.dbName = dbName + args.tableName = tableName + args.txnId = txnId + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_get_lock_materialization_rebuild(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = get_lock_materialization_rebuild_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "get_lock_materialization_rebuild failed: unknown result") + + def heartbeat_lock_materialization_rebuild(self, dbName, tableName, txnId): + """ + Parameters: + - dbName + - tableName + - txnId + """ + self.send_heartbeat_lock_materialization_rebuild(dbName, tableName, txnId) + return self.recv_heartbeat_lock_materialization_rebuild() + + def send_heartbeat_lock_materialization_rebuild(self, dbName, tableName, txnId): + self._oprot.writeMessageBegin('heartbeat_lock_materialization_rebuild', TMessageType.CALL, self._seqid) + args = heartbeat_lock_materialization_rebuild_args() + args.dbName = dbName + args.tableName = tableName + args.txnId = txnId + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_heartbeat_lock_materialization_rebuild(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = heartbeat_lock_materialization_rebuild_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "heartbeat_lock_materialization_rebuild failed: unknown result") + class Processor(fb303.FacebookService.Processor, Iface, TProcessor): def __init__(self, handler): @@ -8915,6 +9003,8 @@ class Processor(fb303.FacebookService.Processor, Iface, TProcessor): self._processMap["set_schema_version_state"] = Processor.process_set_schema_version_state self._processMap["add_serde"] = Processor.process_add_serde self._processMap["get_serde"] = Processor.process_get_serde + self._processMap["get_lock_materialization_rebuild"] = Processor.process_get_lock_materialization_rebuild + self._processMap["heartbeat_lock_materialization_rebuild"] = Processor.process_heartbeat_lock_materialization_rebuild def process(self, iprot, oprot): (name, type, seqid) = iprot.readMessageBegin() @@ -13889,6 +13979,44 @@ class Processor(fb303.FacebookService.Processor, Iface, TProcessor): oprot.writeMessageEnd() oprot.trans.flush() + def process_get_lock_materialization_rebuild(self, seqid, iprot, oprot): + args = get_lock_materialization_rebuild_args() + args.read(iprot) + iprot.readMessageEnd() + result = get_lock_materialization_rebuild_result() + try: + result.success = self._handler.get_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("get_lock_materialization_rebuild", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + + def process_heartbeat_lock_materialization_rebuild(self, seqid, iprot, oprot): + args = heartbeat_lock_materialization_rebuild_args() + args.read(iprot) + iprot.readMessageEnd() + result = heartbeat_lock_materialization_rebuild_result() + try: + result.success = self._handler.heartbeat_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("heartbeat_lock_materialization_rebuild", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + # HELPER FUNCTIONS AND STRUCTURES @@ -47281,3 +47409,314 @@ class get_serde_result: def __ne__(self, other): return not (self == other) + +class get_lock_materialization_rebuild_args: + """ + Attributes: + - dbName + - tableName + - txnId + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'dbName', None, None, ), # 1 + (2, TType.STRING, 'tableName', None, None, ), # 2 + (3, TType.I64, 'txnId', None, None, ), # 3 + ) + + def __init__(self, dbName=None, tableName=None, txnId=None,): + self.dbName = dbName + self.tableName = tableName + self.txnId = txnId + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.dbName = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.tableName = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.txnId = iprot.readI64() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('get_lock_materialization_rebuild_args') + if self.dbName is not None: + oprot.writeFieldBegin('dbName', TType.STRING, 1) + oprot.writeString(self.dbName) + oprot.writeFieldEnd() + if self.tableName is not None: + oprot.writeFieldBegin('tableName', TType.STRING, 2) + oprot.writeString(self.tableName) + oprot.writeFieldEnd() + if self.txnId is not None: + oprot.writeFieldBegin('txnId', TType.I64, 3) + oprot.writeI64(self.txnId) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.dbName) + value = (value * 31) ^ hash(self.tableName) + value = (value * 31) ^ hash(self.txnId) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class get_lock_materialization_rebuild_result: + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.STRUCT, 'success', (LockResponse, LockResponse.thrift_spec), None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = LockResponse() + self.success.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('get_lock_materialization_rebuild_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class heartbeat_lock_materialization_rebuild_args: + """ + Attributes: + - dbName + - tableName + - txnId + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'dbName', None, None, ), # 1 + (2, TType.STRING, 'tableName', None, None, ), # 2 + (3, TType.I64, 'txnId', None, None, ), # 3 + ) + + def __init__(self, dbName=None, tableName=None, txnId=None,): + self.dbName = dbName + self.tableName = tableName + self.txnId = txnId + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.dbName = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.tableName = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.txnId = iprot.readI64() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('heartbeat_lock_materialization_rebuild_args') + if self.dbName is not None: + oprot.writeFieldBegin('dbName', TType.STRING, 1) + oprot.writeString(self.dbName) + oprot.writeFieldEnd() + if self.tableName is not None: + oprot.writeFieldBegin('tableName', TType.STRING, 2) + oprot.writeString(self.tableName) + oprot.writeFieldEnd() + if self.txnId is not None: + oprot.writeFieldBegin('txnId', TType.I64, 3) + oprot.writeI64(self.txnId) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.dbName) + value = (value * 31) ^ hash(self.tableName) + value = (value * 31) ^ hash(self.txnId) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class heartbeat_lock_materialization_rebuild_result: + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.BOOL, 'success', None, None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.BOOL: + self.success = iprot.readBool() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('heartbeat_lock_materialization_rebuild_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.BOOL, 0) + oprot.writeBool(self.success) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 972db1f..f2f61e0 100644 --- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -16941,6 +16941,7 @@ class Materialization: - tablesUsed - validTxnList - invalidationTime + - sourceTablesUpdateDeleteModified """ thrift_spec = ( @@ -16948,12 +16949,14 @@ class Materialization: (1, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 1 (2, TType.STRING, 'validTxnList', None, None, ), # 2 (3, TType.I64, 'invalidationTime', None, None, ), # 3 + (4, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 4 ) - def __init__(self, tablesUsed=None, validTxnList=None, invalidationTime=None,): + def __init__(self, tablesUsed=None, validTxnList=None, invalidationTime=None, sourceTablesUpdateDeleteModified=None,): self.tablesUsed = tablesUsed self.validTxnList = validTxnList self.invalidationTime = invalidationTime + self.sourceTablesUpdateDeleteModified = sourceTablesUpdateDeleteModified def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -16984,6 +16987,11 @@ class Materialization: self.invalidationTime = iprot.readI64() else: iprot.skip(ftype) + elif fid == 4: + if ftype == TType.BOOL: + self.sourceTablesUpdateDeleteModified = iprot.readBool() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -17009,14 +17017,16 @@ class Materialization: oprot.writeFieldBegin('invalidationTime', TType.I64, 3) oprot.writeI64(self.invalidationTime) oprot.writeFieldEnd() + if self.sourceTablesUpdateDeleteModified is not None: + oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 4) + oprot.writeBool(self.sourceTablesUpdateDeleteModified) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() def validate(self): if self.tablesUsed is None: raise TProtocol.TProtocolException(message='Required field tablesUsed is unset!') - if self.invalidationTime is None: - raise TProtocol.TProtocolException(message='Required field invalidationTime is unset!') return @@ -17025,6 +17035,7 @@ class Materialization: value = (value * 31) ^ hash(self.tablesUsed) value = (value * 31) ^ hash(self.validTxnList) value = (value * 31) ^ hash(self.invalidationTime) + value = (value * 31) ^ hash(self.sourceTablesUpdateDeleteModified) return value def __repr__(self): http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 94454a1..0e70e89 100644 --- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3814,18 +3814,19 @@ class Materialization TABLESUSED = 1 VALIDTXNLIST = 2 INVALIDATIONTIME = 3 + SOURCETABLESUPDATEDELETEMODIFIED = 4 FIELDS = { TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}}, VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true}, - INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 'invalidationTime'} + INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 'invalidationTime', :optional => true}, + SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name => 'sourceTablesUpdateDeleteModified', :optional => true} } def struct_fields; FIELDS; end def validate raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tablesUsed is unset!') unless @tablesUsed - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field invalidationTime is unset!') unless @invalidationTime end ::Thrift::Struct.generate_accessors self http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb index c103675..58ebd29 100644 --- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb +++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb @@ -3348,6 +3348,36 @@ module ThriftHiveMetastore raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_serde failed: unknown result') end + def get_lock_materialization_rebuild(dbName, tableName, txnId) + send_get_lock_materialization_rebuild(dbName, tableName, txnId) + return recv_get_lock_materialization_rebuild() + end + + def send_get_lock_materialization_rebuild(dbName, tableName, txnId) + send_message('get_lock_materialization_rebuild', Get_lock_materialization_rebuild_args, :dbName => dbName, :tableName => tableName, :txnId => txnId) + end + + def recv_get_lock_materialization_rebuild() + result = receive_message(Get_lock_materialization_rebuild_result) + return result.success unless result.success.nil? + raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_lock_materialization_rebuild failed: unknown result') + end + + def heartbeat_lock_materialization_rebuild(dbName, tableName, txnId) + send_heartbeat_lock_materialization_rebuild(dbName, tableName, txnId) + return recv_heartbeat_lock_materialization_rebuild() + end + + def send_heartbeat_lock_materialization_rebuild(dbName, tableName, txnId) + send_message('heartbeat_lock_materialization_rebuild', Heartbeat_lock_materialization_rebuild_args, :dbName => dbName, :tableName => tableName, :txnId => txnId) + end + + def recv_heartbeat_lock_materialization_rebuild() + result = receive_message(Heartbeat_lock_materialization_rebuild_result) + return result.success unless result.success.nil? + raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'heartbeat_lock_materialization_rebuild failed: unknown result') + end + end class Processor < ::FacebookService::Processor @@ -5875,6 +5905,20 @@ module ThriftHiveMetastore write_result(result, oprot, 'get_serde', seqid) end + def process_get_lock_materialization_rebuild(seqid, iprot, oprot) + args = read_args(iprot, Get_lock_materialization_rebuild_args) + result = Get_lock_materialization_rebuild_result.new() + result.success = @handler.get_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId) + write_result(result, oprot, 'get_lock_materialization_rebuild', seqid) + end + + def process_heartbeat_lock_materialization_rebuild(seqid, iprot, oprot) + args = read_args(iprot, Heartbeat_lock_materialization_rebuild_args) + result = Heartbeat_lock_materialization_rebuild_result.new() + result.success = @handler.heartbeat_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId) + write_result(result, oprot, 'heartbeat_lock_materialization_rebuild', seqid) + end + end # HELPER FUNCTIONS AND STRUCTURES @@ -13301,5 +13345,77 @@ module ThriftHiveMetastore ::Thrift::Struct.generate_accessors self end + class Get_lock_materialization_rebuild_args + include ::Thrift::Struct, ::Thrift::Struct_Union + DBNAME = 1 + TABLENAME = 2 + TXNID = 3 + + FIELDS = { + DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'}, + TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'}, + TXNID => {:type => ::Thrift::Types::I64, :name => 'txnId'} + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + + class Get_lock_materialization_rebuild_result + include ::Thrift::Struct, ::Thrift::Struct_Union + SUCCESS = 0 + + FIELDS = { + SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::LockResponse} + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + + class Heartbeat_lock_materialization_rebuild_args + include ::Thrift::Struct, ::Thrift::Struct_Union + DBNAME = 1 + TABLENAME = 2 + TXNID = 3 + + FIELDS = { + DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'}, + TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'}, + TXNID => {:type => ::Thrift::Types::I64, :name => 'txnId'} + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + + class Heartbeat_lock_materialization_rebuild_result + include ::Thrift::Struct, ::Thrift::Struct_Union + SUCCESS = 0 + + FIELDS = { + SUCCESS => {:type => ::Thrift::Types::BOOL, :name => 'success'} + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + end http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index c81b8fa..30922ba 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -8367,6 +8367,18 @@ public class HiveMetaStore extends ThriftHiveMetastore { endFunction("get_serde", serde != null, ex); } } + + @Override + public LockResponse get_lock_materialization_rebuild(String dbName, String tableName, long txnId) + throws TException { + return MaterializationsRebuildLockHandler.get().lockResource(dbName, tableName, txnId); + } + + @Override + public boolean heartbeat_lock_materialization_rebuild(String dbName, String tableName, long txnId) + throws TException { + return MaterializationsRebuildLockHandler.get().refreshLockResource(dbName, tableName, txnId); + } } private static IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, Configuration conf) http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index ebbf465..95a3767 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -3127,4 +3127,14 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { else if (max <= Short.MAX_VALUE) return (short)max; else return Short.MAX_VALUE; } + + @Override + public LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException { + return client.get_lock_materialization_rebuild(dbName, tableName, txnId); + } + + @Override + public boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException { + return client.heartbeat_lock_materialization_rebuild(dbName, tableName, txnId); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index b2c40c2..98674cf 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -3613,4 +3613,25 @@ public interface IMetaStoreClient { * @throws TException general thrift error */ SerDeInfo getSerDe(String serDeName) throws TException; + + /** + * Acquire the materialization rebuild lock for a given view. We need to specify the fully + * qualified name of the materialized view and the open transaction ID so we can identify + * uniquely the lock. + * @param dbName db name for the materialized view + * @param tableName table name for the materialized view + * @param txnId transaction id for the rebuild + * @return the response from the metastore, where the lock id is equal to the txn id and + * the status can be either ACQUIRED or NOT ACQUIRED + */ + LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException; + + /** + * Method to refresh the acquisition of a given materialization rebuild lock. + * @param dbName db name for the materialized view + * @param tableName table name for the materialized view + * @param txnId transaction id for the rebuild + * @return true if the lock could be renewed, false otherwise + */ + boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException; } http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java deleted file mode 100644 index 3d77407..0000000 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore; - -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.hadoop.hive.metastore.api.Materialization; -import org.apache.hadoop.hive.metastore.api.Table; - -/** - * Contains information about the invalidation of a materialization, - * including the materialization name, the tables that it uses, and - * the invalidation time, i.e., the first moment t0 after the - * materialization was created at which one of the tables that it uses - * was modified. - */ -@SuppressWarnings("serial") -public class MaterializationInvalidationInfo extends Materialization { - - private AtomicLong invalidationTime; - - public MaterializationInvalidationInfo(Set<String> tablesUsed, String validTxnList) { - super(tablesUsed, 0); - this.setValidTxnList(validTxnList); - this.invalidationTime = new AtomicLong(0); - } - - public boolean compareAndSetInvalidationTime(long expect, long update) { - boolean success = invalidationTime.compareAndSet(expect, update); - if (success) { - super.setInvalidationTime(update); - } - return success; - } - - public long getInvalidationTime() { - return invalidationTime.get(); - } - - public void setInvalidationTime(long invalidationTime) { - throw new UnsupportedOperationException("You should call compareAndSetInvalidationTime instead"); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java index 80cb1de..99c5abc 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java @@ -26,15 +26,17 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.ValidReadTxnList; -import org.apache.hadoop.hive.common.ValidTxnList; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; +import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.Materialization; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; @@ -65,10 +67,12 @@ public final class MaterializationsInvalidationCache { /* Key is the database name. Each value is a map from the unique view qualified name to * the materialization invalidation info. This invalidation object contains information - * such as the tables used by the materialized view or the invalidation time, i.e., first - * modification of the tables used by materialized view after the view was created. */ - private final ConcurrentMap<String, ConcurrentMap<String, MaterializationInvalidationInfo>> materializations = - new ConcurrentHashMap<String, ConcurrentMap<String, MaterializationInvalidationInfo>>(); + * such as the tables used by the materialized view, whether there was any update or + * delete in the source tables since the materialized view was created or rebuilt, + * or the invalidation time, i.e., first modification of the tables used by materialized + * view after the view was created. */ + private final ConcurrentMap<String, ConcurrentMap<String, Materialization>> materializations = + new ConcurrentHashMap<>(); /* * Key is a qualified table name. The value is a (sorted) tree map (supporting concurrent @@ -77,7 +81,10 @@ public final class MaterializationsInvalidationCache { * materialization. */ private final ConcurrentMap<String, ConcurrentSkipListMap<Long, Long>> tableModifications = - new ConcurrentHashMap<String, ConcurrentSkipListMap<Long, Long>>(); + new ConcurrentHashMap<>(); + + private final ConcurrentMap<String, ConcurrentSkipListSet<Long>> updateDeleteTableModifications = + new ConcurrentHashMap<>(); /* Whether the cache has been initialized or not. */ private boolean initialized; @@ -188,9 +195,9 @@ public final class MaterializationsInvalidationCache { return; } // We are going to create the map for each view in the given database - ConcurrentMap<String, MaterializationInvalidationInfo> cq = - new ConcurrentHashMap<String, MaterializationInvalidationInfo>(); - final ConcurrentMap<String, MaterializationInvalidationInfo> prevCq = materializations.putIfAbsent( + ConcurrentMap<String, Materialization> cq = + new ConcurrentHashMap<String, Materialization>(); + final ConcurrentMap<String, Materialization> prevCq = materializations.putIfAbsent( dbName, cq); if (prevCq != null) { cq = prevCq; @@ -204,13 +211,15 @@ public final class MaterializationsInvalidationCache { } if (opType == OpType.CREATE || opType == OpType.ALTER) { // You store the materialized view - cq.put(tableName, new MaterializationInvalidationInfo(tablesUsed, validTxnList)); + Materialization materialization = new Materialization(tablesUsed); + materialization.setValidTxnList(validTxnList); + cq.put(tableName, materialization); } else { - ValidTxnList txnList = new ValidReadTxnList(validTxnList); + ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(validTxnList); for (String qNameTableUsed : tablesUsed) { + ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed); // First we insert a new tree set to keep table modifications, unless it already exists - ConcurrentSkipListMap<Long, Long> modificationsTree = - new ConcurrentSkipListMap<Long, Long>(); + ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>(); final ConcurrentSkipListMap<Long, Long> prevModificationsTree = tableModifications.putIfAbsent( qNameTableUsed, modificationsTree); if (prevModificationsTree != null) { @@ -222,7 +231,7 @@ public final class MaterializationsInvalidationCache { try { String[] names = qNameTableUsed.split("\\."); BasicTxnInfo e = handler.getTxnHandler().getFirstCompletedTransactionForTableAfterCommit( - names[0], names[1], txnList); + names[0], names[1], tableTxnList); if (!e.isIsnull()) { modificationsTree.put(e.getTxnid(), e.getTime()); // We do not need to do anything more for current table, as we detected @@ -236,7 +245,9 @@ public final class MaterializationsInvalidationCache { } } // For LOAD, you only add it if it does exist as you might be loading an outdated MV - cq.putIfAbsent(tableName, new MaterializationInvalidationInfo(tablesUsed, validTxnList)); + Materialization materialization = new Materialization(tablesUsed); + materialization.setValidTxnList(validTxnList); + cq.putIfAbsent(tableName, materialization); } if (LOG.isDebugEnabled()) { LOG.debug("Cached materialized view for rewriting in invalidation cache: " + @@ -249,7 +260,7 @@ public final class MaterializationsInvalidationCache { * invalidation for the MVs that use that table. */ public void notifyTableModification(String dbName, String tableName, - long txnId, long newModificationTime) { + long txnId, long newModificationTime, boolean isUpdateDelete) { if (disable) { // Nothing to do return; @@ -258,8 +269,18 @@ public final class MaterializationsInvalidationCache { LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}", tableName, dbName, txnId, newModificationTime); } - ConcurrentSkipListMap<Long, Long> modificationsTree = - new ConcurrentSkipListMap<Long, Long>(); + if (isUpdateDelete) { + // We update first the update/delete modifications record + ConcurrentSkipListSet<Long> modificationsSet = new ConcurrentSkipListSet<>(); + final ConcurrentSkipListSet<Long> prevModificationsSet = + updateDeleteTableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), + modificationsSet); + if (prevModificationsSet != null) { + modificationsSet = prevModificationsSet; + } + modificationsSet.add(txnId); + } + ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>(); final ConcurrentSkipListMap<Long, Long> prevModificationsTree = tableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), modificationsTree); if (prevModificationsTree != null) { @@ -293,30 +314,21 @@ public final class MaterializationsInvalidationCache { if (materializations.get(dbName) != null) { ImmutableMap.Builder<String, Materialization> m = ImmutableMap.builder(); for (String materializationName : materializationNames) { - MaterializationInvalidationInfo materialization = + Materialization materialization = materializations.get(dbName).get(materializationName); if (materialization == null) { LOG.debug("Materialization {} skipped as there is no information " + "in the invalidation cache about it", materializationName); continue; } - long invalidationTime = getInvalidationTime(materialization); - // We need to check whether previous value is zero, as data modification - // in another table used by the materialized view might have modified - // the value too - boolean modified = materialization.compareAndSetInvalidationTime(0L, invalidationTime); - while (!modified) { - long currentInvalidationTime = materialization.getInvalidationTime(); - if (invalidationTime < currentInvalidationTime) { - // It was set by other table modification, but it was after this table modification - // hence we need to set it - modified = materialization.compareAndSetInvalidationTime(currentInvalidationTime, invalidationTime); - } else { - // Nothing to do - modified = true; - } - } - m.put(materializationName, materialization); + // We create a deep copy of the materialization, as we need to set the time + // and whether any update/delete operation happen on the tables that it uses + // since it was created. + Materialization materializationCopy = new Materialization( + materialization.getTablesUsed()); + materializationCopy.setValidTxnList(materialization.getValidTxnList()); + enrichWithInvalidationInfo(materializationCopy); + m.put(materializationName, materializationCopy); } Map<String, Materialization> result = m.build(); if (LOG.isDebugEnabled()) { @@ -327,50 +339,65 @@ public final class MaterializationsInvalidationCache { return ImmutableMap.of(); } - private long getInvalidationTime(MaterializationInvalidationInfo materialization) { - String txnListString = materialization.getValidTxnList(); - if (txnListString == null) { + private void enrichWithInvalidationInfo(Materialization materialization) { + String materializationTxnListString = materialization.getValidTxnList(); + if (materializationTxnListString == null) { // This can happen when the materialization was created on non-transactional tables - return Long.MIN_VALUE; + materialization.setInvalidationTime(Long.MIN_VALUE); + return; } // We will obtain the modification time as follows. // First, we obtain the first element after high watermark (if any) // Then, we iterate through the elements from min open txn till high // watermark, updating the modification time after creation if needed - ValidTxnList txnList = new ValidReadTxnList(txnListString); + ValidTxnWriteIdList materializationTxnList = new ValidTxnWriteIdList(materializationTxnListString); long firstModificationTimeAfterCreation = 0L; + boolean containsUpdateDelete = false; for (String qNameTableUsed : materialization.getTablesUsed()) { - final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed) - .higherEntry(txnList.getHighWatermark()); + final ValidWriteIdList tableMaterializationTxnList = + materializationTxnList.getTableValidWriteIdList(qNameTableUsed); + + final ConcurrentSkipListMap<Long, Long> usedTableModifications = + tableModifications.get(qNameTableUsed); + final ConcurrentSkipListSet<Long> usedUDTableModifications = + updateDeleteTableModifications.get(qNameTableUsed); + final Entry<Long, Long> tn = usedTableModifications.higherEntry(tableMaterializationTxnList.getHighWatermark()); if (tn != null) { if (firstModificationTimeAfterCreation == 0L || tn.getValue() < firstModificationTimeAfterCreation) { firstModificationTimeAfterCreation = tn.getValue(); } + // Check if there was any update/delete after creation + containsUpdateDelete = usedUDTableModifications != null && + !usedUDTableModifications.tailSet(tableMaterializationTxnList.getHighWatermark(), false).isEmpty(); } // Min open txn might be null if there were no open transactions // when this transaction was being executed - if (txnList.getMinOpenTxn() != null) { + if (tableMaterializationTxnList.getMinOpenWriteId() != null) { // Invalid transaction list is sorted int pos = 0; - for (Map.Entry<Long, Long> t : tableModifications.get(qNameTableUsed) - .subMap(txnList.getMinOpenTxn(), txnList.getHighWatermark()).entrySet()) { - while (pos < txnList.getInvalidTransactions().length && - txnList.getInvalidTransactions()[pos] != t.getKey()) { + for (Map.Entry<Long, Long> t : usedTableModifications + .subMap(tableMaterializationTxnList.getMinOpenWriteId(), tableMaterializationTxnList.getHighWatermark()).entrySet()) { + while (pos < tableMaterializationTxnList.getInvalidWriteIds().length && + tableMaterializationTxnList.getInvalidWriteIds()[pos] != t.getKey()) { pos++; } - if (pos >= txnList.getInvalidTransactions().length) { + if (pos >= tableMaterializationTxnList.getInvalidWriteIds().length) { break; } if (firstModificationTimeAfterCreation == 0L || t.getValue() < firstModificationTimeAfterCreation) { firstModificationTimeAfterCreation = t.getValue(); } + containsUpdateDelete = containsUpdateDelete || + (usedUDTableModifications != null && usedUDTableModifications.contains(t.getKey())); } } } - return firstModificationTimeAfterCreation; + + materialization.setInvalidationTime(firstModificationTimeAfterCreation); + materialization.setSourceTablesUpdateDeleteModified(containsUpdateDelete); } private enum OpType { @@ -395,16 +422,17 @@ public final class MaterializationsInvalidationCache { // We execute the cleanup in two steps // First we gather all the transactions that need to be kept final Multimap<String, Long> keepTxnInfos = HashMultimap.create(); - for (Map.Entry<String, ConcurrentMap<String, MaterializationInvalidationInfo>> e : materializations.entrySet()) { - for (MaterializationInvalidationInfo m : e.getValue().values()) { - ValidTxnList txnList = new ValidReadTxnList(m.getValidTxnList()); + for (Map.Entry<String, ConcurrentMap<String, Materialization>> e : materializations.entrySet()) { + for (Materialization m : e.getValue().values()) { + ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(m.getValidTxnList()); boolean canBeDeleted = false; String currentTableForInvalidatingTxn = null; long currentInvalidatingTxnId = 0L; long currentInvalidatingTxnTime = 0L; for (String qNameTableUsed : m.getTablesUsed()) { + ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed); final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed) - .higherEntry(txnList.getHighWatermark()); + .higherEntry(tableTxnList.getHighWatermark()); if (tn != null) { if (currentInvalidatingTxnTime == 0L || tn.getValue() < currentInvalidatingTxnTime) { @@ -424,16 +452,16 @@ public final class MaterializationsInvalidationCache { currentInvalidatingTxnTime = tn.getValue(); } } - if (txnList.getMinOpenTxn() != null) { + if (tableTxnList.getMinOpenWriteId() != null) { // Invalid transaction list is sorted int pos = 0; for (Entry<Long, Long> t : tableModifications.get(qNameTableUsed) - .subMap(txnList.getMinOpenTxn(), txnList.getHighWatermark()).entrySet()) { - while (pos < txnList.getInvalidTransactions().length && - txnList.getInvalidTransactions()[pos] != t.getKey()) { + .subMap(tableTxnList.getMinOpenWriteId(), tableTxnList.getHighWatermark()).entrySet()) { + while (pos < tableTxnList.getInvalidWriteIds().length && + tableTxnList.getInvalidWriteIds()[pos] != t.getKey()) { pos++; } - if (pos >= txnList.getInvalidTransactions().length) { + if (pos >= tableTxnList.getInvalidWriteIds().length) { break; } if (currentInvalidatingTxnTime == 0L || @@ -462,6 +490,7 @@ public final class MaterializationsInvalidationCache { long removed = 0L; for (Entry<String, ConcurrentSkipListMap<Long, Long>> e : tableModifications.entrySet()) { Collection<Long> c = keepTxnInfos.get(e.getKey()); + ConcurrentSkipListSet<Long> updateDeleteForTable = updateDeleteTableModifications.get(e.getKey()); for (Iterator<Entry<Long, Long>> it = e.getValue().entrySet().iterator(); it.hasNext();) { Entry<Long, Long> v = it.next(); // We need to check again the time because some of the transactions might not be explored @@ -472,6 +501,9 @@ public final class MaterializationsInvalidationCache { LOG.debug("Transaction removed from cache for table {} -> id: {}, time: {}", e.getKey(), v.getKey(), v.getValue()); } + if (updateDeleteForTable != null) { + updateDeleteForTable.remove(v.getKey()); + } it.remove(); removed++; } @@ -480,4 +512,23 @@ public final class MaterializationsInvalidationCache { return removed; } + /** + * Checks whether the given materialization exists in the invalidation cache. + * @param dbName the database name for the materialization + * @param tblName the table name for the materialization + * @return true if we have information about the materialization in the cache, + * false otherwise + */ + public boolean containsMaterialization(String dbName, String tblName) { + if (disable || dbName == null || tblName == null) { + return false; + } + ConcurrentMap<String, Materialization> dbMaterializations = materializations.get(dbName); + if (dbMaterializations == null || dbMaterializations.get(tblName) == null) { + // This is a table + return false; + } + return true; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java new file mode 100644 index 0000000..8ca9ede --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * Cleaner for the {@link MaterializationsRebuildLockHandler}. It removes outdated locks + * in the intervals specified by the input property. + */ +public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThread { + private static final Logger LOG = LoggerFactory.getLogger(MaterializationsRebuildLockCleanerTask.class); + + private Configuration conf; + + @Override + public long runFrequency(TimeUnit unit) { + return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, unit) / 2; + } + + @Override + public void setConf(Configuration configuration) { + conf = configuration; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void run() { + long removedCnt = MaterializationsRebuildLockHandler.get().cleanupResourceLocks( + MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS)); + if (removedCnt > 0) { + if (LOG.isDebugEnabled()) { + LOG.info("Number of materialization locks deleted: " + removedCnt); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java new file mode 100644 index 0000000..dd31226 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.MetaException; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This is a lock handler implementation for the materializations rebuild. + * It is lightweight: it does not persist any information to metastore db. + * Its states are as follows: + * 1) request lock -> 2) ACQUIRED -> 4) COMMIT_READY -> 6) release lock + * -> 5) EXPIRED -> + * -> 3) NOT_ACQUIRED + * First, the rebuild operation will ACQUIRE the lock. If other rebuild + * operation for the same operation is already running, we lock status + * will be NOT_ACQUIRED. + * Before committing the rebuild, the txn handler will signal the handler + * that it is ready to commit the resource (move state to COMMIT_READY). + * We make sure the lock is still available before moving to the new state. + * A lock will not be able to expire when it is in COMMIT_READY state. + * The unlock method is always call by the txn handler, no matter whether + * the transaction succeeds or not, e.g., due to an Exception. + * From ACQUIRED, locks can be also moved to EXPIRED state when they + * expire. From EXPIRED, they can only be released. + */ +public class MaterializationsRebuildLockHandler { + + /* Singleton */ + private static final MaterializationsRebuildLockHandler SINGLETON = new MaterializationsRebuildLockHandler(); + + private final ConcurrentMap<String, ResourceLock> locks = new ConcurrentHashMap<>(); + + private MaterializationsRebuildLockHandler() { + } + + /** + * Get instance of MaterializationsRebuildLockHandler. + * + * @return the singleton + */ + public static MaterializationsRebuildLockHandler get() { + return SINGLETON; + } + + /** + * Lock materialized view (first step for rebuild). Response contains a lock id + * that corresponds to the input transaction id, and whether the lock was + * ACQUIRED or NOT_ACQUIRED. + * @param dbName the db name of the materialization + * @param tableName the table name of the materialization + * @param txnId the transaction id for the rebuild + * @return the response to the lock request + */ + public LockResponse lockResource(String dbName, String tableName, long txnId) { + final ResourceLock prevResourceLock = locks.putIfAbsent( + Warehouse.getQualifiedName(dbName, tableName), + new ResourceLock(txnId, System.nanoTime(), State.ACQUIRED)); + if (prevResourceLock != null) { + return new LockResponse(txnId, LockState.NOT_ACQUIRED); + } + return new LockResponse(txnId, LockState.ACQUIRED); + } + + /** + * Moves from ACQUIRED state to COMMIT_READY. + * @param dbName the db name of the materialization + * @param tableName the table name of the materialization + * @param txnId the transaction id for the rebuild + * @return true if the lock was still active and we could move the materialization + * to COMMIT_READY state, false otherwise + */ + public boolean readyToCommitResource(String dbName, String tableName, long txnId) { + final ResourceLock prevResourceLock = locks.get(Warehouse.getQualifiedName(dbName, tableName)); + if (prevResourceLock == null || prevResourceLock.txnId != txnId) { + // Lock was outdated and it was removed (then maybe another transaction picked it up) + return false; + } + return prevResourceLock.state.compareAndSet(State.ACQUIRED, State.COMMIT_READY); + } + + /** + * Heartbeats a certain lock and refreshes its timer. + * @param dbName the db name of the materialization + * @param tableName the table name of the materialization + * @param txnId the transaction id for the rebuild + * @throws MetaException + */ + public boolean refreshLockResource(String dbName, String tableName, long txnId) { + final ResourceLock prevResourceLock = locks.get(Warehouse.getQualifiedName(dbName, tableName)); + if (prevResourceLock == null || prevResourceLock.txnId != txnId || + prevResourceLock.state.get() != State.ACQUIRED) { + // Lock was outdated and it was removed (then maybe another transaction picked it up) + // or changed its state + return false; + } + prevResourceLock.lastHeartBeatTime.set(System.currentTimeMillis()); + return true; + } + + /** + * Releases a certain lock. + * @param dbName the db name of the materialization + * @param tableName the table name of the materialization + * @param txnId the transaction id for the rebuild + * @return true if the lock could be released properly, false otherwise + * @throws MetaException + */ + public boolean unlockResource(String dbName, String tableName, long txnId) { + final String fullyQualifiedName = Warehouse.getQualifiedName(dbName, tableName); + final ResourceLock prevResourceLock = locks.get(fullyQualifiedName); + if (prevResourceLock == null || prevResourceLock.txnId != txnId) { + return false; + } + return locks.remove(fullyQualifiedName, prevResourceLock); + } + + /** + * Method that removes from the handler those locks that have expired. + * @param timeout time after which we consider the locks to have expired + * @throws MetaException + */ + public long cleanupResourceLocks(long timeout) { + long removed = 0L; + final long currentTime = System.currentTimeMillis(); + for (Iterator<Map.Entry<String, ResourceLock>> it = locks.entrySet().iterator(); it.hasNext();) { + final ResourceLock resourceLock = it.next().getValue(); + if (currentTime - resourceLock.lastHeartBeatTime.get() > timeout) { + if (resourceLock.state.compareAndSet(State.ACQUIRED, State.EXPIRED)) { + it.remove(); + removed++; + } + } + } + return removed; + } + + /** + * This class represents a lock that consists of transaction id, + * last refresh time, and state. + */ + private class ResourceLock { + final long txnId; + final AtomicLong lastHeartBeatTime; + final AtomicStateEnum state; + + ResourceLock(long txnId, long lastHeartBeatTime, State state) { + this.txnId = txnId; + this.lastHeartBeatTime = new AtomicLong(lastHeartBeatTime); + this.state = new AtomicStateEnum(state); + } + } + + private enum State { + // This is the initial state for a lock + ACQUIRED, + // This means that the lock is being committed at this instant, hence + // the cleaner should not remove it even if it times out. If transaction + // fails, the finally clause will remove the lock + COMMIT_READY, + // This means that the lock is ready to be cleaned, hence it cannot + // be committed anymore + EXPIRED; + } + + /** + * Wrapper class around State enum to make its operations atomic. + */ + private class AtomicStateEnum { + private final AtomicReference<State> ref; + + public AtomicStateEnum(final State initialValue) { + this.ref = new AtomicReference<State>(initialValue); + } + + public void set(final State newValue) { + this.ref.set(newValue); + } + + public State get() { + return this.ref.get(); + } + + public State getAndSet(final State newValue) { + return this.ref.getAndSet(newValue); + } + + public boolean compareAndSet(final State expect, final State update) { + return this.ref.compareAndSet(expect, update); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 940a1bf..f007261 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader; import org.apache.hadoop.hive.metastore.HiveAlterHandler; import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask; +import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockCleanerTask; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.events.EventCleanerTask; import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager; @@ -728,7 +729,8 @@ public class MetastoreConf { TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always", EventCleanerTask.class.getName() + "," + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," + - MaterializationsCacheCleanerTask.class.getName(), + MaterializationsCacheCleanerTask.class.getName() + "," + + MaterializationsRebuildLockCleanerTask.class.getName(), "Comma separated list of tasks that will be started in separate threads. These will " + "always be started, regardless of whether the metastore is running in embedded mode " + "or in server mode. They must implement " + MetastoreTaskThread.class.getName()),