Repository: hive Updated Branches: refs/heads/master a285de00d -> 5be9c92fb
http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php b/metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php index d0289fe..7f3c3ea 100644 --- a/metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php +++ b/metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php @@ -1065,6 +1065,11 @@ interface ThriftHiveMetastoreIf extends \FacebookServiceIf { * @return \metastore\CacheFileMetadataResult */ public function cache_file_metadata(\metastore\CacheFileMetadataRequest $req); + /** + * @param \metastore\GetChangeVersionRequest $req + * @return \metastore\GetChangeVersionResult + */ + public function get_change_version(\metastore\GetChangeVersionRequest $req); } class ThriftHiveMetastoreClient extends \FacebookServiceClient implements \metastore\ThriftHiveMetastoreIf { @@ -8683,6 +8688,57 @@ class ThriftHiveMetastoreClient extends \FacebookServiceClient implements \metas throw new \Exception("cache_file_metadata failed: unknown result"); } + public function get_change_version(\metastore\GetChangeVersionRequest $req) + { + $this->send_get_change_version($req); + return $this->recv_get_change_version(); + } + + public function send_get_change_version(\metastore\GetChangeVersionRequest $req) + { + $args = new \metastore\ThriftHiveMetastore_get_change_version_args(); + $args->req = $req; + $bin_accel = ($this->output_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary'); + if ($bin_accel) + { + thrift_protocol_write_binary($this->output_, 'get_change_version', TMessageType::CALL, $args, $this->seqid_, $this->output_->isStrictWrite()); + } + else + { + $this->output_->writeMessageBegin('get_change_version', TMessageType::CALL, $this->seqid_); + $args->write($this->output_); + $this->output_->writeMessageEnd(); + $this->output_->getTransport()->flush(); + } + } + + public function recv_get_change_version() + { + $bin_accel = ($this->input_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_read_binary'); + if ($bin_accel) $result = thrift_protocol_read_binary($this->input_, '\metastore\ThriftHiveMetastore_get_change_version_result', $this->input_->isStrictRead()); + else + { + $rseqid = 0; + $fname = null; + $mtype = 0; + + $this->input_->readMessageBegin($fname, $mtype, $rseqid); + if ($mtype == TMessageType::EXCEPTION) { + $x = new TApplicationException(); + $x->read($this->input_); + $this->input_->readMessageEnd(); + throw $x; + } + $result = new \metastore\ThriftHiveMetastore_get_change_version_result(); + $result->read($this->input_); + $this->input_->readMessageEnd(); + } + if ($result->success !== null) { + return $result->success; + } + throw new \Exception("get_change_version failed: unknown result"); + } + } // HELPER FUNCTIONS AND STRUCTURES @@ -40023,4 +40079,163 @@ class ThriftHiveMetastore_cache_file_metadata_result { } +class ThriftHiveMetastore_get_change_version_args { + static $_TSPEC; + + /** + * @var \metastore\GetChangeVersionRequest + */ + public $req = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'req', + 'type' => TType::STRUCT, + 'class' => '\metastore\GetChangeVersionRequest', + ), + ); + } + if (is_array($vals)) { + if (isset($vals['req'])) { + $this->req = $vals['req']; + } + } + } + + public function getName() { + return 'ThriftHiveMetastore_get_change_version_args'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::STRUCT) { + $this->req = new \metastore\GetChangeVersionRequest(); + $xfer += $this->req->read($input); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('ThriftHiveMetastore_get_change_version_args'); + if ($this->req !== null) { + if (!is_object($this->req)) { + throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); + } + $xfer += $output->writeFieldBegin('req', TType::STRUCT, 1); + $xfer += $this->req->write($output); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + +class ThriftHiveMetastore_get_change_version_result { + static $_TSPEC; + + /** + * @var \metastore\GetChangeVersionResult + */ + public $success = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 0 => array( + 'var' => 'success', + 'type' => TType::STRUCT, + 'class' => '\metastore\GetChangeVersionResult', + ), + ); + } + if (is_array($vals)) { + if (isset($vals['success'])) { + $this->success = $vals['success']; + } + } + } + + public function getName() { + return 'ThriftHiveMetastore_get_change_version_result'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 0: + if ($ftype == TType::STRUCT) { + $this->success = new \metastore\GetChangeVersionResult(); + $xfer += $this->success->read($input); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('ThriftHiveMetastore_get_change_version_result'); + if ($this->success !== null) { + if (!is_object($this->success)) { + throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); + } + $xfer += $output->writeFieldBegin('success', TType::STRUCT, 0); + $xfer += $this->success->write($output); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/gen/thrift/gen-php/metastore/Types.php ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php index 57d1daf..488a920 100644 --- a/metastore/src/gen/thrift/gen-php/metastore/Types.php +++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -14491,6 +14491,156 @@ class FireEventResponse { } +class GetChangeVersionRequest { + static $_TSPEC; + + /** + * @var string + */ + public $topic = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'topic', + 'type' => TType::STRING, + ), + ); + } + if (is_array($vals)) { + if (isset($vals['topic'])) { + $this->topic = $vals['topic']; + } + } + } + + public function getName() { + return 'GetChangeVersionRequest'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->topic); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('GetChangeVersionRequest'); + if ($this->topic !== null) { + $xfer += $output->writeFieldBegin('topic', TType::STRING, 1); + $xfer += $output->writeString($this->topic); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + +class GetChangeVersionResult { + static $_TSPEC; + + /** + * @var int + */ + public $version = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'version', + 'type' => TType::I64, + ), + ); + } + if (is_array($vals)) { + if (isset($vals['version'])) { + $this->version = $vals['version']; + } + } + } + + public function getName() { + return 'GetChangeVersionResult'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->version); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('GetChangeVersionResult'); + if ($this->version !== null) { + $xfer += $output->writeFieldBegin('version', TType::I64, 1); + $xfer += $output->writeI64($this->version); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + class MetadataPpdResult { static $_TSPEC; http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote b/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote index 407f73c..da25a6e 100755 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote @@ -157,6 +157,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help': print(' PutFileMetadataResult put_file_metadata(PutFileMetadataRequest req)') print(' ClearFileMetadataResult clear_file_metadata(ClearFileMetadataRequest req)') print(' CacheFileMetadataResult cache_file_metadata(CacheFileMetadataRequest req)') + print(' GetChangeVersionResult get_change_version(GetChangeVersionRequest req)') print(' string getName()') print(' string getVersion()') print(' fb_status getStatus()') @@ -1024,6 +1025,12 @@ elif cmd == 'cache_file_metadata': sys.exit(1) pp.pprint(client.cache_file_metadata(eval(args[0]),)) +elif cmd == 'get_change_version': + if len(args) != 1: + print('get_change_version requires 1 args') + sys.exit(1) + pp.pprint(client.get_change_version(eval(args[0]),)) + elif cmd == 'getName': if len(args) != 0: print('getName requires 0 args') http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py index 26d8a02..60fb905 100644 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py @@ -1089,6 +1089,13 @@ class Iface(fb303.FacebookService.Iface): """ pass + def get_change_version(self, req): + """ + Parameters: + - req + """ + pass + class Client(fb303.FacebookService.Client, Iface): """ @@ -5959,6 +5966,37 @@ class Client(fb303.FacebookService.Client, Iface): return result.success raise TApplicationException(TApplicationException.MISSING_RESULT, "cache_file_metadata failed: unknown result") + def get_change_version(self, req): + """ + Parameters: + - req + """ + self.send_get_change_version(req) + return self.recv_get_change_version() + + def send_get_change_version(self, req): + self._oprot.writeMessageBegin('get_change_version', TMessageType.CALL, self._seqid) + args = get_change_version_args() + args.req = req + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_get_change_version(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = get_change_version_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "get_change_version failed: unknown result") + class Processor(fb303.FacebookService.Processor, Iface, TProcessor): def __init__(self, handler): @@ -6096,6 +6134,7 @@ class Processor(fb303.FacebookService.Processor, Iface, TProcessor): self._processMap["put_file_metadata"] = Processor.process_put_file_metadata self._processMap["clear_file_metadata"] = Processor.process_clear_file_metadata self._processMap["cache_file_metadata"] = Processor.process_cache_file_metadata + self._processMap["get_change_version"] = Processor.process_get_change_version def process(self, iprot, oprot): (name, type, seqid) = iprot.readMessageBegin() @@ -9395,6 +9434,25 @@ class Processor(fb303.FacebookService.Processor, Iface, TProcessor): oprot.writeMessageEnd() oprot.trans.flush() + def process_get_change_version(self, seqid, iprot, oprot): + args = get_change_version_args() + args.read(iprot) + iprot.readMessageEnd() + result = get_change_version_result() + try: + result.success = self._handler.get_change_version(args.req) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("get_change_version", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + # HELPER FUNCTIONS AND STRUCTURES @@ -32330,3 +32388,134 @@ class cache_file_metadata_result: def __ne__(self, other): return not (self == other) + +class get_change_version_args: + """ + Attributes: + - req + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'req', (GetChangeVersionRequest, GetChangeVersionRequest.thrift_spec), None, ), # 1 + ) + + def __init__(self, req=None,): + self.req = req + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.req = GetChangeVersionRequest() + self.req.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('get_change_version_args') + if self.req is not None: + oprot.writeFieldBegin('req', TType.STRUCT, 1) + self.req.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.req) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class get_change_version_result: + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.STRUCT, 'success', (GetChangeVersionResult, GetChangeVersionResult.thrift_spec), None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = GetChangeVersionResult() + self.success.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('get_change_version_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 77dd9a6..10eaf4a 100644 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -10118,6 +10118,140 @@ class FireEventResponse: def __ne__(self, other): return not (self == other) +class GetChangeVersionRequest: + """ + Attributes: + - topic + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'topic', None, None, ), # 1 + ) + + def __init__(self, topic=None,): + self.topic = topic + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.topic = iprot.readString() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('GetChangeVersionRequest') + if self.topic is not None: + oprot.writeFieldBegin('topic', TType.STRING, 1) + oprot.writeString(self.topic) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.topic is None: + raise TProtocol.TProtocolException(message='Required field topic is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.topic) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class GetChangeVersionResult: + """ + Attributes: + - version + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'version', None, None, ), # 1 + ) + + def __init__(self, version=None,): + self.version = version + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.version = iprot.readI64() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('GetChangeVersionResult') + if self.version is not None: + oprot.writeFieldBegin('version', TType.I64, 1) + oprot.writeI64(self.version) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.version is None: + raise TProtocol.TProtocolException(message='Required field version is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.version) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class MetadataPpdResult: """ Attributes: http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 2cf433b..1cf40ae 100644 --- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2290,6 +2290,40 @@ class FireEventResponse ::Thrift::Struct.generate_accessors self end +class GetChangeVersionRequest + include ::Thrift::Struct, ::Thrift::Struct_Union + TOPIC = 1 + + FIELDS = { + TOPIC => {:type => ::Thrift::Types::STRING, :name => 'topic'} + } + + def struct_fields; FIELDS; end + + def validate + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field topic is unset!') unless @topic + end + + ::Thrift::Struct.generate_accessors self +end + +class GetChangeVersionResult + include ::Thrift::Struct, ::Thrift::Struct_Union + VERSION = 1 + + FIELDS = { + VERSION => {:type => ::Thrift::Types::I64, :name => 'version'} + } + + def struct_fields; FIELDS; end + + def validate + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field version is unset!') unless @version + end + + ::Thrift::Struct.generate_accessors self +end + class MetadataPpdResult include ::Thrift::Struct, ::Thrift::Struct_Union METADATA = 1 http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb index 9b9a27c..9e47f7e 100644 --- a/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb +++ b/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb @@ -2231,6 +2231,21 @@ module ThriftHiveMetastore raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'cache_file_metadata failed: unknown result') end + def get_change_version(req) + send_get_change_version(req) + return recv_get_change_version() + end + + def send_get_change_version(req) + send_message('get_change_version', Get_change_version_args, :req => req) + end + + def recv_get_change_version() + result = receive_message(Get_change_version_result) + return result.success unless result.success.nil? + raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_change_version failed: unknown result') + end + end class Processor < ::FacebookService::Processor @@ -3905,6 +3920,13 @@ module ThriftHiveMetastore write_result(result, oprot, 'cache_file_metadata', seqid) end + def process_get_change_version(seqid, iprot, oprot) + args = read_args(iprot, Get_change_version_args) + result = Get_change_version_result.new() + result.success = @handler.get_change_version(args.req) + write_result(result, oprot, 'get_change_version', seqid) + end + end # HELPER FUNCTIONS AND STRUCTURES @@ -8955,5 +8977,37 @@ module ThriftHiveMetastore ::Thrift::Struct.generate_accessors self end + class Get_change_version_args + include ::Thrift::Struct, ::Thrift::Struct_Union + REQ = 1 + + FIELDS = { + REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => ::GetChangeVersionRequest} + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + + class Get_change_version_result + include ::Thrift::Struct, ::Thrift::Struct_Union + SUCCESS = 0 + + FIELDS = { + SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::GetChangeVersionResult} + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + end http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index bb33693..98fbf70 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -43,110 +43,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; -import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; -import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest; -import org.apache.hadoop.hive.metastore.api.AddPartitionsResult; -import org.apache.hadoop.hive.metastore.api.AggrStats; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.CacheFileMetadataRequest; -import org.apache.hadoop.hive.metastore.api.CacheFileMetadataResult; -import org.apache.hadoop.hive.metastore.api.CheckLockRequest; -import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest; -import org.apache.hadoop.hive.metastore.api.ClearFileMetadataResult; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; -import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr; -import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest; -import org.apache.hadoop.hive.metastore.api.DropPartitionsResult; -import org.apache.hadoop.hive.metastore.api.EnvironmentContext; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; -import org.apache.hadoop.hive.metastore.api.FireEventRequest; -import org.apache.hadoop.hive.metastore.api.FireEventResponse; -import org.apache.hadoop.hive.metastore.api.Function; -import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; -import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest; -import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult; -import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest; -import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; -import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse; -import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; -import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse; -import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest; -import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeResponse; -import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleRequest; -import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleResponse; -import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; -import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; -import org.apache.hadoop.hive.metastore.api.HiveObjectRef; -import org.apache.hadoop.hive.metastore.api.HiveObjectType; -import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.metastore.api.InvalidInputException; -import org.apache.hadoop.hive.metastore.api.InvalidObjectException; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.MetadataPpdResult; -import org.apache.hadoop.hive.metastore.api.NoSuchLockException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; -import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; -import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PartitionEventType; -import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec; -import org.apache.hadoop.hive.metastore.api.PartitionSpec; -import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD; -import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD; -import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; -import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult; -import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest; -import org.apache.hadoop.hive.metastore.api.PartitionsStatsResult; -import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; -import org.apache.hadoop.hive.metastore.api.PrincipalType; -import org.apache.hadoop.hive.metastore.api.PrivilegeBag; -import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo; -import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest; -import org.apache.hadoop.hive.metastore.api.PutFileMetadataResult; -import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; -import org.apache.hadoop.hive.metastore.api.Role; -import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; -import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; -import org.apache.hadoop.hive.metastore.api.SkewedInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.TableMeta; -import org.apache.hadoop.hive.metastore.api.TableStatsRequest; -import org.apache.hadoop.hive.metastore.api.TableStatsResult; -import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; -import org.apache.hadoop.hive.metastore.api.TxnAbortedException; -import org.apache.hadoop.hive.metastore.api.TxnOpenException; -import org.apache.hadoop.hive.metastore.api.Type; -import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; -import org.apache.hadoop.hive.metastore.api.UnknownTableException; -import org.apache.hadoop.hive.metastore.api.UnlockRequest; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; @@ -6037,6 +5934,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { initPartCount = getMS().getPartitionCount(); initDatabaseCount = getMS().getDatabaseCount(); } + + @Override + public GetChangeVersionResult get_change_version(GetChangeVersionRequest req) + throws TException { + return new GetChangeVersionResult(getMS().getChangeVersion(req.getTopic())); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 0c30262..564fca4 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.FireEventResponse; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; +import org.apache.hadoop.hive.metastore.api.GetChangeVersionRequest; import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest; import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; @@ -2294,4 +2295,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient { CacheFileMetadataResult result = client.cache_file_metadata(req); return result.isIsSupported(); } + + @Override + public long getChangeVersion(String topic) throws TException { + return client.get_change_version(new GetChangeVersionRequest(topic)).getVersion(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 4284d54..07c20bf 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -91,6 +91,7 @@ import java.util.Map.Entry; @Public @Evolving public interface IMetaStoreClient { + public static final String PERMANENT_FUNCTION_CV = "PERMANENT_FUNCTION"; /** * Returns whether current client is compatible with conf argument or not @@ -1528,4 +1529,6 @@ public interface IMetaStoreClient { boolean cacheFileMetadata(String dbName, String tableName, String partName, boolean allParts) throws TException; + + long getChangeVersion(String topic) throws TException; } http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b808728..8d05f49 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -113,6 +113,7 @@ import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.model.MChangeVersion; import org.apache.hadoop.hive.metastore.model.MColumnDescriptor; import org.apache.hadoop.hive.metastore.model.MDBPrivilege; import org.apache.hadoop.hive.metastore.model.MDatabase; @@ -166,6 +167,7 @@ import com.google.common.collect.Lists; * to be made into a interface that can read both from a database and a * filestore. */ +@SuppressWarnings("unchecked") public class ObjectStore implements RawStore, Configurable { private static Properties prop = null; private static PersistenceManagerFactory pmf = null; @@ -181,11 +183,11 @@ public class ObjectStore implements RawStore, Configurable { NO_STATE, OPEN, COMMITED, ROLLBACK } - private static final Map<String, Class> PINCLASSMAP; + private static final Map<String, Class<?>> PINCLASSMAP; private static final String HOSTNAME; private static final String USER; static { - Map<String, Class> map = new HashMap<String, Class>(); + Map<String, Class<?>> map = new HashMap<String, Class<?>>(); map.put("table", MTable.class); map.put("storagedescriptor", MStorageDescriptor.class); map.put("serdeinfo", MSerDeInfo.class); @@ -735,9 +737,9 @@ public class ObjectStore implements RawStore, Configurable { query = pm.newQuery(queryStr); query.setResult("name"); query.setOrdering("name ascending"); - Collection names = (Collection) query.execute(); + Collection<?> names = (Collection<?>) query.execute(); databases = new ArrayList<String>(); - for (Iterator i = names.iterator(); i.hasNext();) { + for (Iterator<?> i = names.iterator(); i.hasNext();) { databases.add((String) i.next()); } commited = commitTransaction(); @@ -1037,9 +1039,9 @@ public class ObjectStore implements RawStore, Configurable { query.declareParameters("java.lang.String dbName"); query.setResult("tableName"); query.setOrdering("tableName ascending"); - Collection names = (Collection) query.execute(dbName); + Collection<?> names = (Collection<?>) query.execute(dbName); tbls = new ArrayList<String>(); - for (Iterator i = names.iterator(); i.hasNext();) { + for (Iterator<?> i = names.iterator(); i.hasNext();) { tbls.add((String) i.next()); } commited = commitTransaction(); @@ -1225,8 +1227,8 @@ public class ObjectStore implements RawStore, Configurable { query = pm.newQuery(MTable.class); query.setFilter("database.name == db && tbl_names.contains(tableName)"); query.declareParameters("java.lang.String db, java.util.Collection tbl_names"); - Collection mtables = (Collection) query.execute(db, lowered_tbl_names); - for (Iterator iter = mtables.iterator(); iter.hasNext();) { + Collection<?> mtables = (Collection<?>) query.execute(db, lowered_tbl_names); + for (Iterator<?> iter = mtables.iterator(); iter.hasNext();) { tables.add(convertToTable((MTable) iter.next())); } committed = commitTransaction(); @@ -2075,8 +2077,8 @@ public class ObjectStore implements RawStore, Configurable { if (max > 0) { query.setRange(0, max); } - Collection names = (Collection) query.execute(dbName, tableName); - for (Iterator i = names.iterator(); i.hasNext();) { + Collection<?> names = (Collection<?>) query.execute(dbName, tableName); + for (Iterator<?> i = names.iterator(); i.hasNext();) { pns.add((String) i.next()); } if (query != null) { @@ -2100,7 +2102,7 @@ public class ObjectStore implements RawStore, Configurable { * you want results for. E.g., if resultsCol is partitionName, the Collection * has types of String, and if resultsCol is null, the types are MPartition. */ - private Collection getPartitionPsQueryResults(String dbName, String tableName, + private Collection<?> getPartitionPsQueryResults(String dbName, String tableName, List<String> part_vals, short max_parts, String resultsCol, QueryWrapper queryWrapper) throws MetaException, NoSuchObjectException { dbName = HiveStringUtils.normalizeIdentifier(dbName); @@ -2140,7 +2142,7 @@ public class ObjectStore implements RawStore, Configurable { query.setResult(resultsCol); } - return (Collection) query.execute(dbName, tableName, partNameMatcher); + return (Collection<?>) query.execute(dbName, tableName, partNameMatcher); } @Override @@ -2154,7 +2156,7 @@ public class ObjectStore implements RawStore, Configurable { try { openTransaction(); LOG.debug("executing listPartitionNamesPsWithAuth"); - Collection parts = getPartitionPsQueryResults(db_name, tbl_name, + Collection<?> parts = getPartitionPsQueryResults(db_name, tbl_name, part_vals, max_parts, null, queryWrapper); MTable mtbl = getMTable(db_name, tbl_name); for (Object o : parts) { @@ -2190,7 +2192,7 @@ public class ObjectStore implements RawStore, Configurable { try { openTransaction(); LOG.debug("Executing listPartitionNamesPs"); - Collection names = getPartitionPsQueryResults(dbName, tableName, + Collection<?> names = getPartitionPsQueryResults(dbName, tableName, part_vals, max_parts, "partitionName", queryWrapper); for (Object o : names) { partitionNames.add((String) o); @@ -2836,10 +2838,10 @@ public class ObjectStore implements RawStore, Configurable { String parameterDeclaration = makeParameterDeclarationStringObj(params); query.declareParameters(parameterDeclaration); query.setFilter(queryFilterString); - Collection names = (Collection)query.executeWithMap(params); + Collection<?> names = (Collection<?>)query.executeWithMap(params); // have to emulate "distinct", otherwise tables with the same name may be returned Set<String> tableNamesSet = new HashSet<String>(); - for (Iterator i = names.iterator(); i.hasNext();) { + for (Iterator<?> i = names.iterator(); i.hasNext();) { tableNamesSet.add((String) i.next()); } tableNames = new ArrayList<String>(tableNamesSet); @@ -2889,9 +2891,9 @@ public class ObjectStore implements RawStore, Configurable { query.declareParameters(parameterDeclaration); query.setOrdering("partitionName ascending"); query.setResult("partitionName"); - Collection names = (Collection) query.executeWithMap(params); + Collection<?> names = (Collection<?>) query.executeWithMap(params); partNames = new ArrayList<String>(); - for (Iterator i = names.iterator(); i.hasNext();) { + for (Iterator<?> i = names.iterator(); i.hasNext();) { partNames.add((String) i.next()); } LOG.debug("Done executing query for listMPartitionNamesByFilter"); @@ -3364,8 +3366,8 @@ public class ObjectStore implements RawStore, Configurable { + "order by indexName asc"); query.declareParameters("java.lang.String t1, java.lang.String t2"); query.setResult("indexName"); - Collection names = (Collection) query.execute(dbName, origTableName); - for (Iterator i = names.iterator(); i.hasNext();) { + Collection<?> names = (Collection<?>) query.execute(dbName, origTableName); + for (Iterator<?> i = names.iterator(); i.hasNext();) { pns.add((String) i.next()); } success = commitTransaction(); @@ -3770,9 +3772,9 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Executing listAllRoleNames"); query = pm.newQuery("select roleName from org.apache.hadoop.hive.metastore.model.MRole"); query.setResult("roleName"); - Collection names = (Collection) query.execute(); + Collection<?> names = (Collection<?>) query.execute(); List<String> roleNames = new ArrayList<String>(); - for (Iterator i = names.iterator(); i.hasNext();) { + for (Iterator<?> i = names.iterator(); i.hasNext();) { roleNames.add((String) i.next()); } success = commitTransaction(); @@ -7514,6 +7516,7 @@ public class ObjectStore implements RawStore, Configurable { openTransaction(); MFunction mfunc = convertToMFunction(func); pm.makePersistent(mfunc); + incrementChangeVersionNoTx(IMetaStoreClient.PERMANENT_FUNCTION_CV); committed = commitTransaction(); } finally { if (!committed) { @@ -7540,6 +7543,8 @@ public class ObjectStore implements RawStore, Configurable { throw new MetaException("function " + funcName + " doesn't exist"); } + incrementChangeVersionNoTx(IMetaStoreClient.PERMANENT_FUNCTION_CV); + // For now only alter name, owner, class name, type oldf.setFunctionName(HiveStringUtils.normalizeIdentifier(newf.getFunctionName())); oldf.setDatabase(newf.getDatabase()); @@ -7557,6 +7562,15 @@ public class ObjectStore implements RawStore, Configurable { } } + private void incrementChangeVersionNoTx(String topic) { + MChangeVersion cv = getMChangeVersionNoTx(topic); + if (cv == null) { + cv = new MChangeVersion(topic, 1); + pm.makePersistent(cv); + } + cv.setVersion(cv.getVersion() + 1); + } + @Override public void dropFunction(String dbName, String funcName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { @@ -7569,6 +7583,7 @@ public class ObjectStore implements RawStore, Configurable { // TODO: When function privileges are implemented, they should be deleted here. pm.deletePersistentAll(mfunc); } + incrementChangeVersionNoTx(IMetaStoreClient.PERMANENT_FUNCTION_CV); success = commitTransaction(); } finally { if (!success) { @@ -7602,6 +7617,27 @@ public class ObjectStore implements RawStore, Configurable { return mfunc; } + private MChangeVersion getMChangeVersionNoTx(String topic) { + Query query = null; + try { + query = pm.newQuery(MChangeVersion.class, "topic == topicName"); + query.declareParameters("java.lang.String topicName"); + query.setUnique(true); + Object obj = query.execute(topic); + if (obj == null) { + return null; + } else { + MChangeVersion mversion = (MChangeVersion)obj; + pm.retrieve(mversion); + return mversion; + } + } finally { + if (query != null) { + query.closeAll(); + } + } + } + @Override public Function getFunction(String dbName, String funcName) throws MetaException { boolean commited = false; @@ -7619,6 +7655,21 @@ public class ObjectStore implements RawStore, Configurable { } @Override + public long getChangeVersion(String topic) throws MetaException { + boolean commited = false; + try { + openTransaction(); + long result = getMChangeVersionNoTx(topic).getVersion(); + commited = commitTransaction(); + return result; + } finally { + if (!commited) { + rollbackTransaction(); + } + } + } + + @Override public List<Function> getAllFunctions() throws MetaException { boolean commited = false; try { @@ -7663,9 +7714,9 @@ public class ObjectStore implements RawStore, Configurable { query.declareParameters("java.lang.String dbName"); query.setResult("functionName"); query.setOrdering("functionName ascending"); - Collection names = (Collection) query.execute(dbName); + Collection<?> names = (Collection<?>) query.execute(dbName); funcs = new ArrayList<String>(); - for (Iterator i = names.iterator(); i.hasNext();) { + for (Iterator<?> i = names.iterator(); i.hasNext();) { funcs.add((String) i.next()); } commited = commitTransaction(); @@ -7690,7 +7741,8 @@ public class ObjectStore implements RawStore, Configurable { query = pm.newQuery(MNotificationLog.class, "eventId > lastEvent"); query.declareParameters("java.lang.Long lastEvent"); query.setOrdering("eventId ascending"); - Collection<MNotificationLog> events = (Collection) query.execute(lastEvent); + Collection<MNotificationLog> events = (Collection<MNotificationLog>) + query.execute(lastEvent); commited = commitTransaction(); if (events == null) { return null; http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index cbd5957..e49f757 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -660,4 +660,7 @@ public interface RawStore extends Configurable { */ @InterfaceStability.Evolving int getDatabaseCount() throws MetaException; + + @InterfaceStability.Evolving + long getChangeVersion(String topic) throws MetaException; } http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index 81f1324..61257f0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -100,6 +101,7 @@ public class HBaseReadWrite implements MetadataStore { final static String TABLE_TABLE = "HBMS_TBLS"; final static String USER_TO_ROLE_TABLE = "HBMS_USER_TO_ROLE"; final static String FILE_METADATA_TABLE = "HBMS_FILE_METADATA"; + final static String CHANGE_VERSION_TABLE = "HBMS_CHANGE_VERSION"; final static byte[] CATALOG_CF = "c".getBytes(HBaseUtils.ENCODING); final static byte[] STATS_CF = "s".getBytes(HBaseUtils.ENCODING); final static String NO_CACHE_CONF = "no.use.cache"; @@ -109,7 +111,7 @@ public class HBaseReadWrite implements MetadataStore { public final static String[] tableNames = { AGGR_STATS_TABLE, DB_TABLE, FUNC_TABLE, GLOBAL_PRIVS_TABLE, PART_TABLE, USER_TO_ROLE_TABLE, ROLE_TABLE, SD_TABLE, SECURITY_TABLE, SEQUENCES_TABLE, - TABLE_TABLE, FILE_METADATA_TABLE }; + TABLE_TABLE, FILE_METADATA_TABLE, CHANGE_VERSION_TABLE }; public final static Map<String, List<byte[]>> columnFamilies = new HashMap<> (tableNames.length); static { @@ -126,6 +128,7 @@ public class HBaseReadWrite implements MetadataStore { columnFamilies.put(TABLE_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); // Stats CF will contain PPD stats. columnFamilies.put(FILE_METADATA_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); + columnFamilies.put(CHANGE_VERSION_TABLE, Arrays.asList(CATALOG_CF)); } final static byte[] AGGR_STATS_BLOOM_COL = "b".getBytes(HBaseUtils.ENCODING); @@ -138,6 +141,7 @@ public class HBaseReadWrite implements MetadataStore { private final static byte[] MASTER_KEY_COL = "mk".getBytes(HBaseUtils.ENCODING); private final static byte[] GLOBAL_PRIVS_KEY = "gp".getBytes(HBaseUtils.ENCODING); private final static byte[] SEQUENCES_KEY = "seq".getBytes(HBaseUtils.ENCODING); + private final static byte[] CV_COL = "cv".getBytes(HBaseUtils.ENCODING); private final static int TABLES_TO_CACHE = 10; // False positives are very bad here because they cause us to invalidate entries we shouldn't. // Space used and # of hash functions grows in proportion to ln of num bits so a 10x increase @@ -2136,6 +2140,31 @@ public class HBaseReadWrite implements MetadataStore { colStats.setStatsDesc(csd); return colStats; } + + /********************************************************************************************** + * Change version related methods + *********************************************************************************************/ + + public long getChangeVersion(String topic) throws IOException { + byte[] key = HBaseUtils.buildKey(topic); + byte[] result = read(CHANGE_VERSION_TABLE, key, CATALOG_CF, CV_COL); + return (result == null) ? -1 : Long.valueOf(new String(result, HBaseUtils.ENCODING)); + } + + // TODO: The way this is called now is not ideal. It's all encapsulated and stuff, but, + // before the txns (consistent HBase writes) are properly implemented, we should at least + // put this in the same RPC with real updates. But there are no guarantees anyway, so... + public void incrementChangeVersion(String topic) throws IOException { + byte[] key = HBaseUtils.buildKey(topic); + byte[] serialized = read(CHANGE_VERSION_TABLE, key, CATALOG_CF, CV_COL); + long val = 0; + if (serialized != null) { + val = Long.valueOf(new String(serialized, HBaseUtils.ENCODING)); + } + store(CHANGE_VERSION_TABLE, key, CATALOG_CF, CV_COL, + new Long(val + 1).toString().getBytes(HBaseUtils.ENCODING)); + } + /********************************************************************************************** * File metadata related methods *********************************************************************************************/ http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 317913d..f7d5735 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.FileMetadataHandler; import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.PartFilterExprUtil; import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; import org.apache.hadoop.hive.metastore.RawStore; @@ -2249,7 +2250,9 @@ public class HBaseStore implements RawStore { boolean commit = false; openTransaction(); try { - getHBase().putFunction(func); + HBaseReadWrite hbase = getHBase(); + hbase.putFunction(func); + hbase.incrementChangeVersion(IMetaStoreClient.PERMANENT_FUNCTION_CV); commit = true; } catch (IOException e) { LOG.error("Unable to create function", e); @@ -2265,7 +2268,9 @@ public class HBaseStore implements RawStore { boolean commit = false; openTransaction(); try { - getHBase().putFunction(newFunction); + HBaseReadWrite hbase = getHBase(); + hbase.putFunction(newFunction); + hbase.incrementChangeVersion(IMetaStoreClient.PERMANENT_FUNCTION_CV); commit = true; } catch (IOException e) { LOG.error("Unable to alter function ", e); @@ -2281,7 +2286,9 @@ public class HBaseStore implements RawStore { boolean commit = false; openTransaction(); try { - getHBase().deleteFunction(dbName, funcName); + HBaseReadWrite hbase = getHBase(); + hbase.deleteFunction(dbName, funcName); + hbase.incrementChangeVersion(IMetaStoreClient.PERMANENT_FUNCTION_CV); commit = true; } catch (IOException e) { LOG.error("Unable to delete function" + e); @@ -2569,4 +2576,19 @@ public class HBaseStore implements RawStore { commitOrRoleBack(commit); } } + + @Override + public long getChangeVersion(String topic) throws MetaException { + openTransaction(); + boolean commit = true; + try { + return getHBase().getChangeVersion(topic); + } catch (IOException e) { + commit = false; + LOG.error("Unable to get change version", e); + throw new MetaException("Unable to get change version " + e.getMessage()); + } finally { + commitOrRoleBack(commit); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/model/org/apache/hadoop/hive/metastore/model/MChangeVersion.java ---------------------------------------------------------------------- diff --git a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MChangeVersion.java b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MChangeVersion.java new file mode 100644 index 0000000..d498c86 --- /dev/null +++ b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MChangeVersion.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.model; + +public class MChangeVersion { + private String topic; + private long version; + + public MChangeVersion() { + } + + public MChangeVersion(String topic, long version) { + this.setTopic(topic); + this.setVersion(version); + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public long getVersion() { + return version; + } + + public void setVersion(long version) { + this.version = version; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/model/package.jdo ---------------------------------------------------------------------- diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo index b41b3d8..7385a13 100644 --- a/metastore/src/model/package.jdo +++ b/metastore/src/model/package.jdo @@ -1019,7 +1019,18 @@ </field> </class> - + <class name="MChangeVersion" table="CHANGE_VERSION" identity-type="datastore" detachable="true"> + <datastore-identity> + <column name="CHANGE_VERSION_ID"/> + </datastore-identity> + <field name="version"> + <column name="VERSION" jdbc-type="BIGINT" allows-null="false"/> + </field> + <field name="topic"> + <column name="TOPIC" length="255" jdbc-type="VARCHAR" allows-null="false"/> + <index name="UniqueTopic" unique="true"/> + </field> + </class> </package> </jdo> http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 660dd4e..94ca835 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -815,4 +815,9 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable { public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { return null; } + + @Override + public long getChangeVersion(String topic) throws MetaException { + return 0; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 9103102..b108f95 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -831,6 +831,11 @@ public class DummyRawStoreForJdoConnection implements RawStore { public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { return null; } + + @Override + public long getChangeVersion(String topic) throws MetaException { + return 0; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5be9c92f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 5c32f6f..a92c002 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3494,4 +3494,12 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new HiveException(e); } } + + public long getPermanenFunctionsChangeVersion() throws HiveException { + try { + return getMSC().getChangeVersion(IMetaStoreClient.PERMANENT_FUNCTION_CV); + } catch (TException e) { + throw new HiveException(e); + } + } };
