http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java new file mode 100644 index 0000000..787b9b2 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.regex.PatternSyntaxException; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Catalog; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAbortTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAcidWriteMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAddForeignKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAddNotNullConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAddPrimaryKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAddUniqueConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAllocWriteIdMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterCatalogMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONCommitTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateCatalogMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONDropCatalogMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONDropConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONDropDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONDropFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONDropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONInsertMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONOpenTxnMessage; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.filterMapkeys; + +public class MessageBuilder { + private static final Logger LOG = LoggerFactory.getLogger(MessageBuilder.class); + + public static final String ADD_PARTITION_EVENT = "ADD_PARTITION"; + public static final String ALTER_PARTITION_EVENT = "ALTER_PARTITION"; + public static final String DROP_PARTITION_EVENT = "DROP_PARTITION"; + public static final String CREATE_TABLE_EVENT = "CREATE_TABLE"; + public static final String ALTER_TABLE_EVENT = "ALTER_TABLE"; + public static final String DROP_TABLE_EVENT = "DROP_TABLE"; + public static final String CREATE_DATABASE_EVENT = "CREATE_DATABASE"; + public static final String ALTER_DATABASE_EVENT = "ALTER_DATABASE"; + public static final String DROP_DATABASE_EVENT = "DROP_DATABASE"; + public static final String INSERT_EVENT = "INSERT"; + public static final String CREATE_FUNCTION_EVENT = "CREATE_FUNCTION"; + public static final String DROP_FUNCTION_EVENT = "DROP_FUNCTION"; + public static final String ADD_PRIMARYKEY_EVENT = "ADD_PRIMARYKEY"; + public static final String ADD_FOREIGNKEY_EVENT = "ADD_FOREIGNKEY"; + public static final String ADD_UNIQUECONSTRAINT_EVENT = "ADD_UNIQUECONSTRAINT"; + public static final String ADD_NOTNULLCONSTRAINT_EVENT = "ADD_NOTNULLCONSTRAINT"; + public static final String DROP_CONSTRAINT_EVENT = "DROP_CONSTRAINT"; + public static final String CREATE_ISCHEMA_EVENT = "CREATE_ISCHEMA"; + public static final String ALTER_ISCHEMA_EVENT = "ALTER_ISCHEMA"; + public static final String DROP_ISCHEMA_EVENT = "DROP_ISCHEMA"; + public static final String ADD_SCHEMA_VERSION_EVENT = "ADD_SCHEMA_VERSION"; + public static final String ALTER_SCHEMA_VERSION_EVENT = "ALTER_SCHEMA_VERSION"; + public static final String DROP_SCHEMA_VERSION_EVENT = "DROP_SCHEMA_VERSION"; + public static final String CREATE_CATALOG_EVENT = "CREATE_CATALOG"; + public static final String DROP_CATALOG_EVENT = "DROP_CATALOG"; + public static final String OPEN_TXN_EVENT = "OPEN_TXN"; + public static final String COMMIT_TXN_EVENT = "COMMIT_TXN"; + public static final String ABORT_TXN_EVENT = "ABORT_TXN"; + public static final String ALLOC_WRITE_ID_EVENT = "ALLOC_WRITE_ID_EVENT"; + public static final String ALTER_CATALOG_EVENT = "ALTER_CATALOG"; + public static final String ACID_WRITE_EVENT = "ACID_WRITE_EVENT"; + + protected static final Configuration conf = MetastoreConf.newMetastoreConf(); + + private static final String MS_SERVER_URL = MetastoreConf + .getVar(conf, MetastoreConf.ConfVars.THRIFT_URIS, ""); + private static final String MS_SERVICE_PRINCIPAL = + MetastoreConf.getVar(conf, MetastoreConf.ConfVars.KERBEROS_PRINCIPAL, ""); + + private static volatile MessageBuilder instance; + private static final Object lock = new Object(); + + public static MessageBuilder getInstance() { + if (instance == null) { + synchronized (lock) { + if (instance == null) { + instance = new MessageBuilder(); + instance.init(); + } + } + } + return instance; + } + + private static List<Predicate<String>> paramsFilter; + + public void init() { + List<String> excludePatterns = Arrays.asList(MetastoreConf + .getTrimmedStringsVar(conf, + MetastoreConf.ConfVars.EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS)); + try { + paramsFilter = MetaStoreUtils.compilePatternsToPredicates(excludePatterns); + } catch (PatternSyntaxException e) { + LOG.error("Regex pattern compilation failed. Verify that " + + "metastore.notification.parameters.exclude.patterns has valid patterns."); + throw new IllegalStateException("Regex pattern compilation failed. " + e.getMessage()); + } + } + + public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) { + return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, now()); + } + + public AlterDatabaseMessage buildAlterDatabaseMessage(Database beforeDb, Database afterDb) { + return new JSONAlterDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, + beforeDb, afterDb, now()); + } + + public DropDatabaseMessage buildDropDatabaseMessage(Database db) { + return new JSONDropDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now()); + } + + public CreateTableMessage buildCreateTableMessage(Table table, Iterator<String> fileIter) { + return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, fileIter, now()); + } + + public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp, + Long writeId) { + return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, + isTruncateOp, writeId, now()); + } + + public DropTableMessage buildDropTableMessage(Table table) { + return new JSONDropTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, now()); + } + + public AddPartitionMessage buildAddPartitionMessage(Table table, + Iterator<Partition> partitionsIterator, Iterator<PartitionFiles> partitionFileIter) { + return new JSONAddPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, + partitionsIterator, partitionFileIter, now()); + } + + public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, + Partition after, boolean isTruncateOp, Long writeId) { + return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, + table, before, after, isTruncateOp, writeId, now()); + } + + public DropPartitionMessage buildDropPartitionMessage(Table table, + Iterator<Partition> partitionsIterator) { + return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, + getPartitionKeyValues(table, partitionsIterator), now()); + } + + public CreateFunctionMessage buildCreateFunctionMessage(Function fn) { + return new JSONCreateFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now()); + } + + public DropFunctionMessage buildDropFunctionMessage(Function fn) { + return new JSONDropFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now()); + } + + public InsertMessage buildInsertMessage(Table tableObj, Partition partObj, + boolean replace, Iterator<String> fileIter) { + return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, + tableObj, partObj, replace, fileIter, now()); + } + + public AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List<SQLPrimaryKey> pks) { + return new JSONAddPrimaryKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, pks, now()); + } + + public AddForeignKeyMessage buildAddForeignKeyMessage(List<SQLForeignKey> fks) { + return new JSONAddForeignKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fks, now()); + } + + public AddUniqueConstraintMessage buildAddUniqueConstraintMessage(List<SQLUniqueConstraint> uks) { + return new JSONAddUniqueConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, uks, now()); + } + + public AddNotNullConstraintMessage buildAddNotNullConstraintMessage( + List<SQLNotNullConstraint> nns) { + return new JSONAddNotNullConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, nns, now()); + } + + public DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName, + String constraintName) { + return new JSONDropConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, dbName, tableName, + constraintName, now()); + } + + public CreateCatalogMessage buildCreateCatalogMessage(Catalog catalog) { + return new JSONCreateCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(), + now()); + } + + public AlterCatalogMessage buildAlterCatalogMessage(Catalog beforeCat, Catalog afterCat) { + return new JSONAlterCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, + beforeCat, afterCat, now()); + } + + public DropCatalogMessage buildDropCatalogMessage(Catalog catalog) { + return new JSONDropCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(), + now()); + } + + public OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId) { + return new JSONOpenTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fromTxnId, toTxnId, now()); + } + + public CommitTxnMessage buildCommitTxnMessage(Long txnId) { + return new JSONCommitTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now()); + } + + public AbortTxnMessage buildAbortTxnMessage(Long txnId) { + return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now()); + } + + public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList, + String dbName, String tableName) { + return new JSONAllocWriteIdMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnToWriteIdList, + dbName, tableName, now()); + } + + public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, + Iterator<String> files) { + return new JSONAcidWriteMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), acidWriteEvent, + files); + } + + private long now() { + return System.currentTimeMillis() / 1000; + } + + public static String createPrimaryKeyObjJson(SQLPrimaryKey primaryKeyObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(primaryKeyObj, "UTF-8"); + } + + public static String createForeignKeyObjJson(SQLForeignKey foreignKeyObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(foreignKeyObj, "UTF-8"); + } + + public static String createUniqueConstraintObjJson(SQLUniqueConstraint uniqueConstraintObj) + throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(uniqueConstraintObj, "UTF-8"); + } + + public static String createNotNullConstraintObjJson(SQLNotNullConstraint notNullConstaintObj) + throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(notNullConstaintObj, "UTF-8"); + } + + public static String createDatabaseObjJson(Database dbObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(dbObj, "UTF-8"); + } + + public static String createCatalogObjJson(Catalog catObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(catObj, "UTF-8"); + } + + public static String createTableObjJson(Table tableObj) throws TException { + //Note: The parameters of the Table object will be removed in the filter if it matches + // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS + filterMapkeys(tableObj.getParameters(), paramsFilter); + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(tableObj, "UTF-8"); + } + + public static String createPartitionObjJson(Partition partitionObj) throws TException { + //Note: The parameters of the Partition object will be removed in the filter if it matches + // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS + filterMapkeys(partitionObj.getParameters(), paramsFilter); + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(partitionObj, "UTF-8"); + } + + public static String createFunctionObjJson(Function functionObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(functionObj, "UTF-8"); + } + + public static Table getTableObj(ObjectNode jsonTree) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + Table tableObj = new Table(); + String tableJson = jsonTree.get("tableObjJson").asText(); + deSerializer.deserialize(tableObj, tableJson, "UTF-8"); + return tableObj; + } + + /* + * TODO: Some thoughts here : We have a current todo to move some of these methods over to + * MessageFactory instead of being here, so we can override them, but before we move them over, + * we should keep the following in mind: + * + * a) We should return Iterables, not Lists. That makes sure that we can be memory-safe when + * implementing it rather than forcing ourselves down a path wherein returning List is part of + * our interface, and then people use .size() or somesuch which makes us need to materialize + * the entire list and not change. Also, returning Iterables allows us to do things like + * Iterables.transform for some of these. + * b) We should not have "magic" names like "tableObjJson", because that breaks expectation of a + * couple of things - firstly, that of serialization format, although that is fine for this + * JSONMessageEncoder, and secondly, that makes us just have a number of mappings, one for each + * obj type, and sometimes, as the case is with alter, have multiples. Also, any event-specific + * item belongs in that event message / event itself, as opposed to in the factory. It's okay to + * have utility accessor methods here that are used by each of the messages to provide accessors. + * I'm adding a couple of those here. + * + */ + + public static TBase getTObj(String tSerialized, Class<? extends TBase> objClass) + throws Exception { + TDeserializer thriftDeSerializer = new TDeserializer(new TJSONProtocol.Factory()); + TBase obj = objClass.newInstance(); + thriftDeSerializer.deserialize(obj, tSerialized, "UTF-8"); + return obj; + } + + public static Iterable<? extends TBase> getTObjs( + Iterable<String> objRefStrs, final Class<? extends TBase> objClass) throws Exception { + + try { + return Iterables.transform(objRefStrs, new com.google.common.base.Function<String, TBase>() { + + public TBase apply(@Nullable String objStr) { + try { + return getTObj(objStr, objClass); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException re) { + // We have to add this bit of exception handling here, because Function.apply does not allow us to throw + // the actual exception that might be a checked exception, so we wind up needing to throw a RuntimeException + // with the previously thrown exception as its cause. However, since RuntimeException.getCause() returns + // a throwable instead of an Exception, we have to account for the possibility that the underlying code + // might have thrown a Throwable that we wrapped instead, in which case, continuing to throw the + // RuntimeException is the best thing we can do. + Throwable t = re.getCause(); + if (t instanceof Exception) { + throw (Exception) t; + } else { + throw re; + } + } + } + + // If we do not need this format of accessor using ObjectNode, this is a candidate for removal as well + public static Iterable<? extends TBase> getTObjs( + ObjectNode jsonTree, String objRefListName, final Class<? extends TBase> objClass) + throws Exception { + Iterable<JsonNode> jsonArrayIterator = jsonTree.get(objRefListName); + return getTObjs(Iterables.transform(jsonArrayIterator, JsonNode::asText), objClass); + } + + public static Map<String, String> getPartitionKeyValues(Table table, Partition partition) { + Map<String, String> partitionKeys = new LinkedHashMap<>(); + for (int i = 0; i < table.getPartitionKeysSize(); ++i) { + partitionKeys.put(table.getPartitionKeys().get(i).getName(), + partition.getValues().get(i)); + } + return partitionKeys; + } + + public static List<Map<String, String>> getPartitionKeyValues(final Table table, + Iterator<Partition> iterator) { + return Lists.newArrayList(Iterators + .transform(iterator, partition -> getPartitionKeyValues(table, partition))); + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java new file mode 100644 index 0000000..832a80c --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +public interface MessageEncoder { + MessageDeserializer getDeserializer(); + + MessageSerializer getSerializer(); + + String getMessageFormat(); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index 58c6891..16e74bb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -20,329 +20,86 @@ package org.apache.hadoop.hive.metastore.messaging; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.Catalog; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Function; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.SQLForeignKey; -import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; -import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; -import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; -import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.Iterator; -import java.util.List; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; /** * Abstract Factory for the construction of HCatalog message instances. */ public abstract class MessageFactory { + private static final Logger LOG = LoggerFactory.getLogger(MessageFactory.class.getName()); - // Common name constants for event messages - public static final String ADD_PARTITION_EVENT = "ADD_PARTITION"; - public static final String ALTER_PARTITION_EVENT = "ALTER_PARTITION"; - public static final String DROP_PARTITION_EVENT = "DROP_PARTITION"; - public static final String CREATE_TABLE_EVENT = "CREATE_TABLE"; - public static final String ALTER_TABLE_EVENT = "ALTER_TABLE"; - public static final String DROP_TABLE_EVENT = "DROP_TABLE"; - public static final String CREATE_DATABASE_EVENT = "CREATE_DATABASE"; - public static final String ALTER_DATABASE_EVENT = "ALTER_DATABASE"; - public static final String DROP_DATABASE_EVENT = "DROP_DATABASE"; - public static final String INSERT_EVENT = "INSERT"; - public static final String CREATE_FUNCTION_EVENT = "CREATE_FUNCTION"; - public static final String DROP_FUNCTION_EVENT = "DROP_FUNCTION"; - public static final String ADD_PRIMARYKEY_EVENT = "ADD_PRIMARYKEY"; - public static final String ADD_FOREIGNKEY_EVENT = "ADD_FOREIGNKEY"; - public static final String ADD_UNIQUECONSTRAINT_EVENT = "ADD_UNIQUECONSTRAINT"; - public static final String ADD_NOTNULLCONSTRAINT_EVENT = "ADD_NOTNULLCONSTRAINT"; - public static final String DROP_CONSTRAINT_EVENT = "DROP_CONSTRAINT"; - public static final String CREATE_ISCHEMA_EVENT = "CREATE_ISCHEMA"; - public static final String ALTER_ISCHEMA_EVENT = "ALTER_ISCHEMA"; - public static final String DROP_ISCHEMA_EVENT = "DROP_ISCHEMA"; - public static final String ADD_SCHEMA_VERSION_EVENT = "ADD_SCHEMA_VERSION"; - public static final String ALTER_SCHEMA_VERSION_EVENT = "ALTER_SCHEMA_VERSION"; - public static final String DROP_SCHEMA_VERSION_EVENT = "DROP_SCHEMA_VERSION"; - public static final String CREATE_CATALOG_EVENT = "CREATE_CATALOG"; - public static final String DROP_CATALOG_EVENT = "DROP_CATALOG"; - public static final String OPEN_TXN_EVENT = "OPEN_TXN"; - public static final String COMMIT_TXN_EVENT = "COMMIT_TXN"; - public static final String ABORT_TXN_EVENT = "ABORT_TXN"; - public static final String ALLOC_WRITE_ID_EVENT = "ALLOC_WRITE_ID_EVENT"; - public static final String ALTER_CATALOG_EVENT = "ALTER_CATALOG"; - public static final String ACID_WRITE_EVENT = "ACID_WRITE_EVENT"; + protected static final Configuration conf = MetastoreConf.newMetastoreConf(); - private static MessageFactory instance = null; + private static final Map<String, Method> registry = new HashMap<>(); - protected static final Configuration conf = MetastoreConf.newMetastoreConf(); - /* - // TODO MS-SPLIT I'm 99% certain we don't need this, as MetastoreConf.newMetastoreConf already - adds this resource. - static { - conf.addResource("hive-site.xml"); + public static void register(String messageFormat, Class clazz) { + Method method = requiredMethod(clazz); + registry.put(messageFormat, method); } - */ - - protected static final String MS_SERVER_URL = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS, ""); - protected static final String MS_SERVICE_PRINCIPAL = - MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL, ""); + static { + register(GzipJSONMessageEncoder.FORMAT, GzipJSONMessageEncoder.class); + register(JSONMessageEncoder.FORMAT, JSONMessageEncoder.class); + } - /** - * Getter for MessageFactory instance. - */ - public static MessageFactory getInstance() { - if (instance == null) { - instance = getInstance(MetastoreConf.getVar(conf, ConfVars.EVENT_MESSAGE_FACTORY)); + private static Method requiredMethod(Class clazz) { + if (MessageEncoder.class.isAssignableFrom(clazz)) { + try { + Method methodInstance = clazz.getMethod("getInstance"); + if (MessageEncoder.class.isAssignableFrom(methodInstance.getReturnType())) { + int modifiers = methodInstance.getModifiers(); + if (Modifier.isStatic(modifiers) && Modifier.isPublic(modifiers)) { + return methodInstance; + } + throw new NoSuchMethodException( + "modifier for getInstance() method is not 'public static' in " + clazz + .getCanonicalName()); + } + throw new NoSuchMethodException( + "return type is not assignable to " + MessageEncoder.class.getCanonicalName()); + } catch (NoSuchMethodException e) { + String message = clazz.getCanonicalName() + + " does not implement the required 'public static MessageEncoder getInstance()' method "; + LOG.error(message, e); + throw new IllegalArgumentException(message, e); + } } - return instance; + String message = clazz.getCanonicalName() + " is not assignable to " + MessageEncoder.class + .getCanonicalName(); + LOG.error(message); + throw new IllegalArgumentException(message); } - private static MessageFactory getInstance(String className) { - try { - MessageFactory factory = JavaUtils.newInstance(JavaUtils.getClass(className, MessageFactory.class)); - factory.init(); - return factory; - } catch (MetaException e) { - throw new IllegalStateException("Could not construct MessageFactory implementation: ", e); + public static MessageEncoder getInstance(String messageFormat) + throws InvocationTargetException, IllegalAccessException { + Method methodInstance = registry.get(messageFormat); + if (methodInstance == null) { + LOG.error("received incorrect MessageFormat " + messageFormat); + throw new RuntimeException("messageFormat: " + messageFormat + " is not supported "); } + return (MessageEncoder) methodInstance.invoke(null); } - /** - * Getter for MessageDeserializer, corresponding to the specified format and version. - * @param format Serialization format for notifications. - * @param version Version of serialization format (currently ignored.) - * @return MessageDeserializer. - */ - public static MessageDeserializer getDeserializer(String format, - String version) { - return getInstance(MetastoreConf.getVar(conf, ConfVars.EVENT_MESSAGE_FACTORY)).getDeserializer(); - // Note : The reason this method exists outside the no-arg getDeserializer method is in - // case there is a user-implemented MessageFactory that's used, and some the messages - // are in an older format and the rest in another. Then, what MessageFactory is default - // is irrelevant, we should always use the one that was used to create it to deserialize. - // - // There exist only 2 implementations of this - json and jms - // - // Additional note : rather than as a config parameter, does it make sense to have - // this use jdbc-like semantics that each MessageFactory made available register - // itself for discoverability? Might be worth pursuing. + public static MessageEncoder getDefaultInstance(Configuration conf) { + String clazz = + MetastoreConf.get(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getVarname()); + try { + Class<?> clazzObject = MessageFactory.class.getClassLoader().loadClass(clazz); + return (MessageEncoder) requiredMethod(clazzObject).invoke(null); + } catch (Exception e) { + String message = "could not load the configured class " + clazz; + LOG.error(message, e); + throw new IllegalStateException(message, e); + } } - - public void init() throws MetaException {} - - public abstract MessageDeserializer getDeserializer(); - - /** - * Getter for message-format. - */ - public abstract String getMessageFormat(); - - /** - * Factory method for CreateDatabaseMessage. - * @param db The Database being added. - * @return CreateDatabaseMessage instance. - */ - public abstract CreateDatabaseMessage buildCreateDatabaseMessage(Database db); - - /** - * Factory method for AlterDatabaseMessage. - * @param beforeDb The Database before alter. - * @param afterDb The Database after alter. - * @return AlterDatabaseMessage instance. - */ - public abstract AlterDatabaseMessage buildAlterDatabaseMessage(Database beforeDb, Database afterDb); - - /** - * Factory method for DropDatabaseMessage. - * @param db The Database being dropped. - * @return DropDatabaseMessage instance. - */ - public abstract DropDatabaseMessage buildDropDatabaseMessage(Database db); - - /** - * Factory method for CreateTableMessage. - * @param table The Table being created. - * @param files Iterator of files - * @return CreateTableMessage instance. - */ - public abstract CreateTableMessage buildCreateTableMessage(Table table, Iterator<String> files); - - /** - * Factory method for AlterTableMessage. Unlike most of these calls, this one can return null, - * which means no message should be sent. This is because there are many flavors of alter - * table (add column, add partition, etc.). Some are covered elsewhere (like add partition) - * and some are not yet supported. - * @param before The table before the alter - * @param after The table after the alter - * @param isTruncateOp Flag to denote truncate table - * @param writeId writeId under which alter is done (for ACID tables) - * @return - */ - public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp, - Long writeId); - - /** - * Factory method for DropTableMessage. - * @param table The Table being dropped. - * @return DropTableMessage instance. - */ - public abstract DropTableMessage buildDropTableMessage(Table table); - - /** - * Factory method for AddPartitionMessage. - * @param table The Table to which the partitions are added. - * @param partitions The iterator to set of Partitions being added. - * @param partitionFiles The iterator of partition files - * @return AddPartitionMessage instance. - */ - public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator<Partition> partitions, - Iterator<PartitionFiles> partitionFiles); - - /** - * Factory method for building AlterPartitionMessage - * @param table The table in which the partition is being altered - * @param before The partition before it was altered - * @param after The partition after it was altered - * @param isTruncateOp Flag to denote truncate partition - * @param writeId writeId under which alter is done (for ACID tables) - * @return a new AlterPartitionMessage - */ - public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after, boolean isTruncateOp, - Long writeId); - - /** - * Factory method for DropPartitionMessage. - * @param table The Table from which the partition is dropped. - * @param partitions The set of partitions being dropped. - * @return DropPartitionMessage instance. - */ - public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Iterator<Partition> partitions); - - /** - * Factory method for CreateFunctionMessage. - * @param fn The Function being added. - * @return CreateFunctionMessage instance. - */ - public abstract CreateFunctionMessage buildCreateFunctionMessage(Function fn); - - /** - * Factory method for DropFunctionMessage. - * @param fn The Function being dropped. - * @return DropFunctionMessage instance. - */ - public abstract DropFunctionMessage buildDropFunctionMessage(Function fn); - - /** - * Factory method for building insert message - * - * @param tableObj Table object where the insert occurred in - * @param ptnObj Partition object where the insert occurred in, may be null if - * the insert was done into a non-partitioned table - * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO - * @param files Iterator of file created - * @return instance of InsertMessage - */ - public abstract InsertMessage buildInsertMessage(Table tableObj, Partition ptnObj, - boolean replace, Iterator<String> files); - - /** - * Factory method for building open txn message using start and end transaction range - * - * @param fromTxnId start transaction id (inclusive) - * @param toTxnId end transaction id (inclusive) - * @return instance of OpenTxnMessage - */ - public abstract OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId); - - /** - * Factory method for building commit txn message - * - * @param txnId Id of the transaction to be committed - * @return instance of CommitTxnMessage - */ - public abstract CommitTxnMessage buildCommitTxnMessage(Long txnId); - - /** - * Factory method for building abort txn message - * - * @param txnId Id of the transaction to be aborted - * @return instance of AbortTxnMessage - */ - public abstract AbortTxnMessage buildAbortTxnMessage(Long txnId); - - /** - * Factory method for building alloc write id message - * - * @param txnToWriteIdList List of Txn Ids and write id map - * @param dbName db for which write ids to be allocated - * @param tableName table for which write ids to be allocated - * @return instance of AllocWriteIdMessage - */ - public abstract AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList, String dbName, - String tableName); - - /*** - * Factory method for building add primary key message - * - * @param pks list of primary keys - * @return instance of AddPrimaryKeyMessage - */ - public abstract AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List<SQLPrimaryKey> pks); - - /*** - * Factory method for building add foreign key message - * - * @param fks list of foreign keys - * @return instance of AddForeignKeyMessage - */ - public abstract AddForeignKeyMessage buildAddForeignKeyMessage(List<SQLForeignKey> fks); - - /*** - * Factory method for building add unique constraint message - * - * @param uks list of unique constraints - * @return instance of SQLUniqueConstraint - */ - public abstract AddUniqueConstraintMessage buildAddUniqueConstraintMessage(List<SQLUniqueConstraint> uks); - - /*** - * Factory method for building add not null constraint message - * - * @param nns list of not null constraints - * @return instance of SQLNotNullConstraint - */ - public abstract AddNotNullConstraintMessage buildAddNotNullConstraintMessage(List<SQLNotNullConstraint> nns); - - /*** - * Factory method for building drop constraint message - * @param dbName - * @param tableName - * @param constraintName - * @return - */ - public abstract DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName, - String constraintName); - - public abstract CreateCatalogMessage buildCreateCatalogMessage(Catalog catalog); - - public abstract DropCatalogMessage buildDropCatalogMessage(Catalog catalog); - - public abstract AlterCatalogMessage buildAlterCatalogMessage(Catalog oldCat, Catalog newCat); - - /** - * Factory method for building acid write message - * - * - * @param acidWriteEvent information related to the acid write operation - * @param files files added by this write operation - * @return instance of AcidWriteMessage - */ - public abstract AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java new file mode 100644 index 0000000..b249d76 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +public interface MessageSerializer { + default String serialize(EventMessage message) { + return message.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java index fdb6942..712c12c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.metastore.messaging.event.filters; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import java.util.regex.Pattern; @@ -41,9 +41,9 @@ public class DatabaseAndTableFilter extends BasicFilter { } private boolean isTxnRelatedEvent(final NotificationEvent event) { - return ((event.getEventType().equals(MessageFactory.OPEN_TXN_EVENT)) || - (event.getEventType().equals(MessageFactory.COMMIT_TXN_EVENT)) || - (event.getEventType().equals(MessageFactory.ABORT_TXN_EVENT))); + return ((event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) || + (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT)) || + (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT))); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java index cc528ee..a5d8f78 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import java.util.Iterator; import java.util.List; @@ -60,9 +61,9 @@ public class JSONAcidWriteMessage extends AcidWriteMessage { this.writeId = acidWriteEvent.getWriteId(); this.partition = acidWriteEvent.getPartition(); try { - this.tableObjJson = JSONMessageFactory.createTableObjJson(acidWriteEvent.getTableObj()); + this.tableObjJson = MessageBuilder.createTableObjJson(acidWriteEvent.getTableObj()); if (acidWriteEvent.getPartitionObj() != null) { - this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(acidWriteEvent.getPartitionObj()); + this.partitionObjJson = MessageBuilder.createPartitionObjJson(acidWriteEvent.getPartitionObj()); } else { this.partitionObjJson = null; } @@ -119,13 +120,13 @@ public class JSONAcidWriteMessage extends AcidWriteMessage { @Override public Table getTableObj() throws Exception { - return (tableObjJson == null) ? null : (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class); + return (tableObjJson == null) ? null : (Table) MessageBuilder.getTObj(tableObjJson, Table.class); } @Override public Partition getPartitionObj() throws Exception { return ((partitionObjJson == null) ? null : - (Partition) JSONMessageFactory.getTObj(partitionObjJson, Partition.class)); + (Partition) MessageBuilder.getTObj(partitionObjJson, Partition.class)); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java index d4a0bc2..c3d6fb6 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import com.fasterxml.jackson.annotation.JsonProperty; @@ -55,7 +56,7 @@ public class JSONAddForeignKeyMessage extends AddForeignKeyMessage { this.foreignKeyListJson = new ArrayList<>(); try { for (SQLForeignKey pk : fks) { - foreignKeyListJson.add(JSONMessageFactory.createForeignKeyObjJson(pk)); + foreignKeyListJson.add(MessageBuilder.createForeignKeyObjJson(pk)); } } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); @@ -86,7 +87,7 @@ public class JSONAddForeignKeyMessage extends AddForeignKeyMessage { public List<SQLForeignKey> getForeignKeys() throws Exception { List<SQLForeignKey> fks = new ArrayList<>(); for (String pkJson : foreignKeyListJson) { - fks.add((SQLForeignKey)JSONMessageFactory.getTObj(pkJson, SQLForeignKey.class)); + fks.add((SQLForeignKey) MessageBuilder.getTObj(pkJson, SQLForeignKey.class)); } return fks; } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java index 1c3e8a8..f9f351f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import com.fasterxml.jackson.annotation.JsonProperty; @@ -50,7 +51,7 @@ public class JSONAddNotNullConstraintMessage extends AddNotNullConstraintMessage this.notNullConstraintListJson = new ArrayList<>(); try { for (SQLNotNullConstraint nn : nns) { - notNullConstraintListJson.add(JSONMessageFactory.createNotNullConstraintObjJson(nn)); + notNullConstraintListJson.add(MessageBuilder.createNotNullConstraintObjJson(nn)); } } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); @@ -81,7 +82,7 @@ public class JSONAddNotNullConstraintMessage extends AddNotNullConstraintMessage public List<SQLNotNullConstraint> getNotNullConstraints() throws Exception { List<SQLNotNullConstraint> nns = new ArrayList<>(); for (String nnJson : notNullConstraintListJson) { - nns.add((SQLNotNullConstraint)JSONMessageFactory.getTObj(nnJson, SQLNotNullConstraint.class)); + nns.add((SQLNotNullConstraint) MessageBuilder.getTObj(nnJson, SQLNotNullConstraint.class)); } return nns; } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java index bb2093b..6494cb8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; import org.apache.thrift.TException; @@ -79,11 +80,11 @@ public class JSONAddPartitionMessage extends AddPartitionMessage { partitionListJson = new ArrayList<>(); Partition partitionObj; try { - this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); while (partitionsIterator.hasNext()) { partitionObj = partitionsIterator.next(); - partitions.add(JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObj)); - partitionListJson.add(JSONMessageFactory.createPartitionObjJson(partitionObj)); + partitions.add(MessageBuilder.getPartitionKeyValues(tableObj, partitionObj)); + partitionListJson.add(MessageBuilder.createPartitionObjJson(partitionObj)); } } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); @@ -124,7 +125,7 @@ public class JSONAddPartitionMessage extends AddPartitionMessage { @Override public Table getTableObj() throws Exception { - return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class); + return (Table) MessageBuilder.getTObj(tableObjJson,Table.class); } @Override @@ -141,7 +142,7 @@ public class JSONAddPartitionMessage extends AddPartitionMessage { public Iterable<Partition> getPartitionObjs() throws Exception { // glorified cast from Iterable<TBase> to Iterable<Partition> return Iterables.transform( - JSONMessageFactory.getTObjs(partitionListJson,Partition.class), + MessageBuilder.getTObjs(partitionListJson,Partition.class), new Function<Object, Partition>() { @Nullable @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java index 3a18be8..606a051 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import com.fasterxml.jackson.annotation.JsonProperty; @@ -55,7 +56,7 @@ public class JSONAddPrimaryKeyMessage extends AddPrimaryKeyMessage { this.primaryKeyListJson = new ArrayList<>(); try { for (SQLPrimaryKey pk : pks) { - primaryKeyListJson.add(JSONMessageFactory.createPrimaryKeyObjJson(pk)); + primaryKeyListJson.add(MessageBuilder.createPrimaryKeyObjJson(pk)); } } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); @@ -86,7 +87,7 @@ public class JSONAddPrimaryKeyMessage extends AddPrimaryKeyMessage { public List<SQLPrimaryKey> getPrimaryKeys() throws Exception { List<SQLPrimaryKey> pks = new ArrayList<>(); for (String pkJson : primaryKeyListJson) { - pks.add((SQLPrimaryKey)JSONMessageFactory.getTObj(pkJson, SQLPrimaryKey.class)); + pks.add((SQLPrimaryKey) MessageBuilder.getTObj(pkJson, SQLPrimaryKey.class)); } return pks; } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java index 3c4d5e0..ebdcd94 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import com.fasterxml.jackson.annotation.JsonProperty; @@ -52,7 +53,7 @@ public class JSONAddUniqueConstraintMessage extends AddUniqueConstraintMessage { this.uniqueConstraintListJson = new ArrayList<>(); try { for (SQLUniqueConstraint uk : uks) { - uniqueConstraintListJson.add(JSONMessageFactory.createUniqueConstraintObjJson(uk)); + uniqueConstraintListJson.add(MessageBuilder.createUniqueConstraintObjJson(uk)); } } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); @@ -83,7 +84,7 @@ public class JSONAddUniqueConstraintMessage extends AddUniqueConstraintMessage { public List<SQLUniqueConstraint> getUniqueConstraints() throws Exception { List<SQLUniqueConstraint> uks = new ArrayList<>(); for (String pkJson : uniqueConstraintListJson) { - uks.add((SQLUniqueConstraint)JSONMessageFactory.getTObj(pkJson, SQLUniqueConstraint.class)); + uks.add((SQLUniqueConstraint) MessageBuilder.getTObj(pkJson, SQLUniqueConstraint.class)); } return uks; } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java index 779c0b0..7b7c12e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.messaging.json; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.hadoop.hive.metastore.api.Catalog; import org.apache.hadoop.hive.metastore.messaging.AlterCatalogMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; public class JSONAlterCatalogMessage extends AlterCatalogMessage { @@ -41,8 +42,8 @@ public class JSONAlterCatalogMessage extends AlterCatalogMessage { this.servicePrincipal = servicePrincipal; this.timestamp = timestamp; try { - this.catObjBeforeJson = JSONMessageFactory.createCatalogObjJson(catObjBefore); - this.catObjAfterJson = JSONMessageFactory.createCatalogObjJson(catObjAfter); + this.catObjBeforeJson = MessageBuilder.createCatalogObjJson(catObjBefore); + this.catObjAfterJson = MessageBuilder.createCatalogObjJson(catObjAfter); } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } @@ -71,12 +72,12 @@ public class JSONAlterCatalogMessage extends AlterCatalogMessage { @Override public Catalog getCatObjBefore() throws Exception { - return (Catalog) JSONMessageFactory.getTObj(catObjBeforeJson, Catalog.class); + return (Catalog) MessageBuilder.getTObj(catObjBeforeJson, Catalog.class); } @Override public Catalog getCatObjAfter() throws Exception { - return (Catalog) JSONMessageFactory.getTObj(catObjAfterJson, Catalog.class); + return (Catalog) MessageBuilder.getTObj(catObjAfterJson, Catalog.class); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java index 7b316d5..5f9dae4 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.messaging.json; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import com.fasterxml.jackson.annotation.JsonProperty; @@ -48,8 +49,8 @@ public class JSONAlterDatabaseMessage extends AlterDatabaseMessage { this.db = dbObjBefore.getName(); this.timestamp = timestamp; try { - this.dbObjBeforeJson = JSONMessageFactory.createDatabaseObjJson(dbObjBefore); - this.dbObjAfterJson = JSONMessageFactory.createDatabaseObjJson(dbObjAfter); + this.dbObjBeforeJson = MessageBuilder.createDatabaseObjJson(dbObjBefore); + this.dbObjAfterJson = MessageBuilder.createDatabaseObjJson(dbObjAfter); } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } @@ -78,12 +79,12 @@ public class JSONAlterDatabaseMessage extends AlterDatabaseMessage { @Override public Database getDbObjBefore() throws Exception { - return (Database) JSONMessageFactory.getTObj(dbObjBeforeJson, Database.class); + return (Database) MessageBuilder.getTObj(dbObjBeforeJson, Database.class); } @Override public Database getDbObjAfter() throws Exception { - return (Database) JSONMessageFactory.getTObj(dbObjAfterJson, Database.class); + return (Database) MessageBuilder.getTObj(dbObjAfterJson, Database.class); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java index 9b85f4c..a38c1aa 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import com.fasterxml.jackson.annotation.JsonProperty; @@ -62,12 +63,12 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage { this.tableType = tableObj.getTableType(); this.isTruncateOp = Boolean.toString(isTruncateOp); this.timestamp = timestamp; - this.keyValues = JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore); + this.keyValues = MessageBuilder.getPartitionKeyValues(tableObj, partitionObjBefore); this.writeId = writeId; try { - this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); - this.partitionObjBeforeJson = JSONMessageFactory.createPartitionObjJson(partitionObjBefore); - this.partitionObjAfterJson = JSONMessageFactory.createPartitionObjJson(partitionObjAfter); + this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); + this.partitionObjBeforeJson = MessageBuilder.createPartitionObjJson(partitionObjBefore); + this.partitionObjAfterJson = MessageBuilder.createPartitionObjJson(partitionObjAfter); } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } @@ -118,17 +119,17 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage { @Override public Table getTableObj() throws Exception { - return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class); + return (Table) MessageBuilder.getTObj(tableObjJson,Table.class); } @Override public Partition getPtnObjBefore() throws Exception { - return (Partition) JSONMessageFactory.getTObj(partitionObjBeforeJson, Partition.class); + return (Partition) MessageBuilder.getTObj(partitionObjBeforeJson, Partition.class); } @Override public Partition getPtnObjAfter() throws Exception { - return (Partition) JSONMessageFactory.getTObj(partitionObjAfterJson, Partition.class); + return (Partition) MessageBuilder.getTObj(partitionObjAfterJson, Partition.class); } public String getTableObjJson() { http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java index eddff98..d6ec826 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.messaging.json; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import com.fasterxml.jackson.annotation.JsonProperty; @@ -55,8 +56,8 @@ public class JSONAlterTableMessage extends AlterTableMessage { this.timestamp = timestamp; this.writeId = writeId; try { - this.tableObjBeforeJson = JSONMessageFactory.createTableObjJson(tableObjBefore); - this.tableObjAfterJson = JSONMessageFactory.createTableObjJson(tableObjAfter); + this.tableObjBeforeJson = MessageBuilder.createTableObjJson(tableObjBefore); + this.tableObjAfterJson = MessageBuilder.createTableObjJson(tableObjAfter); } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } @@ -102,12 +103,12 @@ public class JSONAlterTableMessage extends AlterTableMessage { @Override public Table getTableObjBefore() throws Exception { - return (Table) JSONMessageFactory.getTObj(tableObjBeforeJson,Table.class); + return (Table) MessageBuilder.getTObj(tableObjBeforeJson,Table.class); } @Override public Table getTableObjAfter() throws Exception { - return (Table) JSONMessageFactory.getTObj(tableObjAfterJson,Table.class); + return (Table) MessageBuilder.getTObj(tableObjAfterJson,Table.class); } public String getTableObjBeforeJson() { http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java index 2c4940b..482fc8e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import java.util.List; @@ -118,13 +119,13 @@ public class JSONCommitTxnMessage extends CommitTxnMessage { @Override public Table getTableObj(int idx) throws Exception { - return tableObjs == null ? null : (Table) JSONMessageFactory.getTObj(tableObjs.get(idx), Table.class); + return tableObjs == null ? null : (Table) MessageBuilder.getTObj(tableObjs.get(idx), Table.class); } @Override public Partition getPartitionObj(int idx) throws Exception { return (partitionObjs == null ? null : (partitionObjs.get(idx) == null ? null : - (Partition)JSONMessageFactory.getTObj(partitionObjs.get(idx), Partition.class))); + (Partition) MessageBuilder.getTObj(partitionObjs.get(idx), Partition.class))); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java index 761ff99..1f5c9e8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging.json; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import com.fasterxml.jackson.annotation.JsonProperty; @@ -48,7 +49,7 @@ public class JSONCreateDatabaseMessage extends CreateDatabaseMessage { this.db = db.getName(); this.timestamp = timestamp; try { - this.dbJson = JSONMessageFactory.createDatabaseObjJson(db); + this.dbJson = MessageBuilder.createDatabaseObjJson(db); } catch (TException ex) { throw new IllegalArgumentException("Could not serialize Function object", ex); } @@ -57,7 +58,7 @@ public class JSONCreateDatabaseMessage extends CreateDatabaseMessage { @Override public Database getDatabaseObject() throws Exception { - return (Database) JSONMessageFactory.getTObj(dbJson, Database.class); + return (Database) MessageBuilder.getTObj(dbJson, Database.class); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java index f7287df..bb50052 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging.json; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import com.fasterxml.jackson.annotation.JsonProperty; @@ -47,7 +48,7 @@ public class JSONCreateFunctionMessage extends CreateFunctionMessage { this.db = fn.getDbName(); this.timestamp = timestamp; try { - this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn); + this.functionObjJson = MessageBuilder.createFunctionObjJson(fn); } catch (TException ex) { throw new IllegalArgumentException("Could not serialize Function object", ex); } @@ -72,7 +73,7 @@ public class JSONCreateFunctionMessage extends CreateFunctionMessage { @Override public Function getFunctionObj() throws Exception { - return (Function) JSONMessageFactory.getTObj(functionObjJson,Function.class); + return (Function) MessageBuilder.getTObj(functionObjJson,Function.class); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java index b80003b..145ee4b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.thrift.TException; import com.fasterxml.jackson.annotation.JsonProperty; @@ -68,7 +69,7 @@ public class JSONCreateTableMessage extends CreateTableMessage { this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), tableObj.getTableType(), timestamp); try { - this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } @@ -111,7 +112,7 @@ public class JSONCreateTableMessage extends CreateTableMessage { @Override public Table getTableObj() throws Exception { - return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class); + return (Table) MessageBuilder.getTObj(tableObjJson,Table.class); } public String getTableObjJson() { http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java index 957d595..23e5496 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.thrift.TException; @@ -70,7 +71,7 @@ public class JSONDropPartitionMessage extends DropPartitionMessage { this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), tableObj.getTableType(), partitionKeyValues, timestamp); try { - this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } @@ -117,7 +118,7 @@ public class JSONDropPartitionMessage extends DropPartitionMessage { @Override public Table getTableObj() throws Exception { - return (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class); + return (Table) MessageBuilder.getTObj(tableObjJson, Table.class); } public String getTableObjJson() { http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java index 88374ec..1ef2ad0 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.messaging.json; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.thrift.TException; @@ -63,7 +64,7 @@ public class JSONDropTableMessage extends DropTableMessage { this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), tableObj.getTableType(), timestamp); try { - this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } @@ -86,7 +87,7 @@ public class JSONDropTableMessage extends DropTableMessage { @Override public Table getTableObj() throws Exception { - return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class); + return (Table) MessageBuilder.getTObj(tableObjJson,Table.class); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java index 2318a67..40d480b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.thrift.TException; @@ -67,9 +68,9 @@ public class JSONInsertMessage extends InsertMessage { this.tableType = tableObj.getTableType(); try { - this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); if (null != ptnObj) { - this.ptnObjJson = JSONMessageFactory.createPartitionObjJson(ptnObj); + this.ptnObjJson = MessageBuilder.createPartitionObjJson(ptnObj); } else { this.ptnObjJson = null; } @@ -128,12 +129,12 @@ public class JSONInsertMessage extends InsertMessage { @Override public Table getTableObj() throws Exception { - return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class); + return (Table) MessageBuilder.getTObj(tableObjJson,Table.class); } @Override public Partition getPtnObj() throws Exception { - return ((null == ptnObjJson) ? null : (Partition) JSONMessageFactory.getTObj(ptnObjJson, Partition.class)); + return ((null == ptnObjJson) ? null : (Partition) MessageBuilder.getTObj(ptnObjJson, Partition.class)); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java new file mode 100644 index 0000000..08c53e4 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; +import org.apache.hadoop.hive.metastore.messaging.MessageSerializer; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; + +/** + * The JSON implementation of the MessageFactory. Constructs JSON implementations of each + * message-type. + */ +public class JSONMessageEncoder implements MessageEncoder { + public static final String FORMAT = "json-0.2"; + + private static MessageDeserializer deserializer = new JSONMessageDeserializer(); + private static MessageSerializer serializer = new MessageSerializer() { + }; + + private static volatile MessageEncoder instance; + + public static MessageEncoder getInstance() { + if (instance == null) { + synchronized (GzipJSONMessageEncoder.class) { + if (instance == null) { + instance = new JSONMessageEncoder(); + } + } + } + return instance; + } + + @Override + public MessageDeserializer getDeserializer() { + return deserializer; + } + + @Override + public MessageSerializer getSerializer() { + return serializer; + } + + /** + * This is a format that's shipped, for any changes make sure that backward compatibiltiy + * with existing messages in this format are taken care of. + * + */ + @Override + public String getMessageFormat() { + return FORMAT; + } +}
