Repository: hive Updated Branches: refs/heads/branch-3 eea9043c8 -> d05361e58
HIVE-20209: Metastore connection fails for first attempt in repl dump (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d05361e5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d05361e5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d05361e5 Branch: refs/heads/branch-3 Commit: d05361e5800da465692787c07c96a168a4063d70 Parents: eea9043 Author: Sankar Hariappan <[email protected]> Authored: Mon Jul 30 23:23:52 2018 +0530 Committer: Sankar Hariappan <[email protected]> Committed: Mon Jul 30 23:23:52 2018 +0530 ---------------------------------------------------------------------- .../api/repl/ReplicationV1CompatRule.java | 29 ++- .../hive/ql/parse/TestReplicationScenarios.java | 3 +- .../ql/cache/results/QueryResultsCache.java | 4 - .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 30 +-- .../apache/hadoop/hive/ql/metadata/Hive.java | 5 +- .../hive/ql/metadata/events/EventUtils.java | 203 +++++++++++++++++++ .../metadata/events/NotificationEventPoll.java | 7 +- .../hive/metastore/messaging/EventUtils.java | 202 ------------------ 8 files changed, 231 insertions(+), 252 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d05361e5/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java index d5e227c..550a5e5 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java @@ -22,8 +22,10 @@ import com.google.common.primitives.Ints; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hive.hcatalog.api.HCatClient; import org.apache.hive.hcatalog.api.HCatNotificationEvent; import org.apache.thrift.TException; @@ -43,6 +45,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Utility class to enable testing of Replv1 compatibility testing. * @@ -65,6 +70,8 @@ public class ReplicationV1CompatRule implements TestRule { private HiveConf hconf = null; private List<String> testsToSkip = null; + private Hive hiveDb; + public ReplicationV1CompatRule(IMetaStoreClient metaStoreClient, HiveConf hconf){ this(metaStoreClient, hconf, new ArrayList<String>()); } @@ -79,6 +86,7 @@ public class ReplicationV1CompatRule implements TestRule { }; this.testsToSkip = testsToSkip; LOG.info("Replv1 backward compatibility tester initialized at " + testEventId.get()); + this.hiveDb = mock(Hive.class); } private Long getCurrentNotificationId(){ @@ -137,16 +145,17 @@ public class ReplicationV1CompatRule implements TestRule { return true; } }; - EventUtils.MSClientNotificationFetcher evFetcher = - new EventUtils.MSClientNotificationFetcher(metaStoreClient); try { + when(hiveDb.getMSC()).thenReturn(metaStoreClient); + EventUtils.MSClientNotificationFetcher evFetcher = + new EventUtils.MSClientNotificationFetcher(hiveDb); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( - evFetcher, testEventIdBefore, - Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1, - evFilter); + evFetcher, testEventIdBefore, + Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1, + evFilter); ReplicationTask.resetFactory(null); - assertTrue("We should have found some events",evIter.hasNext()); - while (evIter.hasNext()){ + assertTrue("We should have found some events", evIter.hasNext()); + while (evIter.hasNext()) { eventCount++; NotificationEvent ev = evIter.next(); // convert to HCatNotificationEvent, and then try to instantiate a ReplicationTask on it. @@ -155,11 +164,11 @@ public class ReplicationV1CompatRule implements TestRule { if (rtask instanceof ErroredReplicationTask) { unhandledTasks.put(ev, ((ErroredReplicationTask) rtask).getCause()); } - } catch (RuntimeException re){ + } catch (RuntimeException re) { incompatibleTasks.put(ev, re); } } - } catch (IOException e) { + } catch (IOException | MetaException e) { assertNull("Got an exception when we shouldn't have - replv1 backward incompatibility issue:",e); } http://git-wip-us.apache.org/repos/asf/hive/blob/d05361e5/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 1e0efe7..4b6bc77 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import javax.annotation.Nullable; + import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -1556,7 +1557,7 @@ public class TestReplicationScenarios { run("USE " + replDbName, driverMirror); verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)", "location", "namelist/year=1990/month=5/day=25", driverMirror); - run("USE " + dbName, driverMirror); + run("USE " + dbName, driver); String[] ptn_data_3 = new String[] { "abraham", "bob", "carter", "david", "fisher" }; String[] data_after_ovwrite = new String[] { "fisher" }; http://git-wip-us.apache.org/repos/asf/hive/blob/d05361e5/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index d29c4da..783e4e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -37,14 +37,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Stream; @@ -62,13 +60,11 @@ import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.Entity.Type; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.events.EventConsumer; http://git-wip-us.apache.org/repos/asf/hive/blob/d05361e5/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 4eba910..8727294 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.exec.repl; -import com.google.common.primitives.Ints; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; @@ -33,7 +31,6 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; @@ -48,6 +45,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; @@ -160,7 +158,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat())); EventUtils.MSClientNotificationFetcher evFetcher - = new EventUtils.MSClientNotificationFetcher(getHive().getMSC()); + = new EventUtils.MSClientNotificationFetcher(getHive()); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); @@ -247,30 +245,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { replLogger.endLog(bootDumpBeginReplId.toString()); } Long bootDumpEndReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); - LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, - bootDumpEndReplId); - - // Now that bootstrap has dumped all objects related, we have to account for the changes - // that occurred while bootstrap was happening - i.e. we have to look through all events - // during the bootstrap period and consolidate them with our dump. - - IMetaStoreClient.NotificationFilter evFilter = - new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern); - EventUtils.MSClientNotificationFetcher evFetcher = - new EventUtils.MSClientNotificationFetcher(hiveDb.getMSC()); - EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( - evFetcher, bootDumpBeginReplId, - Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1, - evFilter); - // Now we consolidate all the events that happenned during the objdump into the objdump - while (evIter.hasNext()) { - NotificationEvent ev = evIter.next(); - Path eventRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); - // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot) - } - LOG.info( - "Consolidation done, preparing to return {},{}->{}", + LOG.info("Preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(); http://git-wip-us.apache.org/repos/asf/hive/blob/d05361e5/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 953cd1d..8c8af47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -457,9 +457,8 @@ public class Hive { metaStoreClient.close(); metaStoreClient = null; } - if (syncMetaStoreClient != null) { - syncMetaStoreClient.close(); - } + // syncMetaStoreClient is wrapped on metaStoreClient. So, it is enough to close it once. + syncMetaStoreClient = null; if (owner != null) { owner = null; } http://git-wip-us.apache.org/repos/asf/hive/blob/d05361e5/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java new file mode 100644 index 0000000..66abd51 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.ql.metadata.events; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class EventUtils { + + public interface NotificationFetcher { + int getBatchSize() throws IOException; + long getCurrentNotificationEventId() throws IOException; + long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException; + List<NotificationEvent> getNextNotificationEvents( + long pos, IMetaStoreClient.NotificationFilter filter) throws IOException; + } + + // MetaStoreClient-based impl of NotificationFetcher + public static class MSClientNotificationFetcher implements NotificationFetcher{ + + private Hive hiveDb = null; + private Integer batchSize = null; + + public MSClientNotificationFetcher(Hive hiveDb){ + this.hiveDb = hiveDb; + } + + @Override + public int getBatchSize() throws IOException { + if (batchSize == null){ + try { + batchSize = Integer.parseInt( + hiveDb.getMSC().getConfigValue(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.toString(), "50")); + // TODO: we're asking the metastore what its configuration for this var is - we may + // want to revisit to pull from client side instead. The reason I have it this way + // is because the metastore is more likely to have a reasonable config for this than + // an arbitrary client. + } catch (TException e) { + throw new IOException(e); + } + } + return batchSize; + } + + @Override + public long getCurrentNotificationEventId() throws IOException { + try { + return hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + } catch (TException e) { + throw new IOException(e); + } + } + + @Override + public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException { + try { + NotificationEventsCountRequest rqst + = new NotificationEventsCountRequest(fromEventId, dbName); + return hiveDb.getMSC().getNotificationEventsCount(rqst).getEventsCount(); + } catch (TException e) { + throw new IOException(e); + } + } + + @Override + public List<NotificationEvent> getNextNotificationEvents( + long pos, IMetaStoreClient.NotificationFilter filter) throws IOException { + try { + return hiveDb.getMSC().getNextNotification(pos,getBatchSize(), filter).getEvents(); + } catch (TException e) { + throw new IOException(e.getMessage(), e); + } + } + } + + public static class NotificationEventIterator implements Iterator<NotificationEvent> { + + private NotificationFetcher nfetcher; + private IMetaStoreClient.NotificationFilter filter; + private int maxEvents; + + private Iterator<NotificationEvent> batchIter = null; + private List<NotificationEvent> batch = null; + private long pos; + private long maxPos; + private int eventCount; + + public NotificationEventIterator( + NotificationFetcher nfetcher, long eventFrom, int maxEvents, + String dbName, String tableName) throws IOException { + init(nfetcher, eventFrom, maxEvents, new DatabaseAndTableFilter(dbName, tableName)); + // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter + // is an operation that needs to run before delegating to the other ctor, and this messes up chaining + // ctors + } + + public NotificationEventIterator( + NotificationFetcher nfetcher, long eventFrom, int maxEvents, + IMetaStoreClient.NotificationFilter filter) throws IOException { + init(nfetcher,eventFrom,maxEvents,filter); + } + + private void init( + NotificationFetcher nfetcher, long eventFrom, int maxEvents, + IMetaStoreClient.NotificationFilter filter) throws IOException { + this.nfetcher = nfetcher; + this.filter = filter; + this.pos = eventFrom; + if (maxEvents < 1){ + // 0 or -1 implies fetch everything + this.maxEvents = Integer.MAX_VALUE; + } else { + this.maxEvents = maxEvents; + } + + this.eventCount = 0; + this.maxPos = nfetcher.getCurrentNotificationEventId(); + } + + private void fetchNextBatch() throws IOException { + batch = nfetcher.getNextNotificationEvents(pos, filter); + int batchSize = nfetcher.getBatchSize(); + while ( ((batch == null) || (batch.isEmpty())) && (pos < maxPos) ){ + // no valid events this batch, but we're still not done processing events + pos += batchSize; + batch = nfetcher.getNextNotificationEvents(pos,filter); + } + + if (batch == null){ + batch = new ArrayList<>(); + // instantiate empty list so that we don't error out on iterator fetching. + // If we're here, then the next check of pos will show our caller that + // that we've exhausted our event supply + } + batchIter = batch.iterator(); + } + + @Override + public boolean hasNext() { + if (eventCount >= maxEvents){ + // If we've already satisfied the number of events we were supposed to deliver, we end it. + return false; + } + if ((batchIter != null) && (batchIter.hasNext())){ + // If we have a valid batchIter and it has more elements, return them. + return true; + } + // If we're here, we want more events, and either batchIter is null, or batchIter + // has reached the end of the current batch. Let's fetch the next batch. + try { + fetchNextBatch(); + } catch (IOException e) { + // Regrettable that we have to wrap the IOException into a RuntimeException, + // but throwing the exception is the appropriate result here, and hasNext() + // signature will only allow RuntimeExceptions. Iterator.hasNext() really + // should have allowed IOExceptions + throw new RuntimeException(e.getMessage(), e); + } + // New batch has been fetched. If it's not empty, we have more elements to process. + return !batch.isEmpty(); + } + + @Override + public NotificationEvent next() { + eventCount++; + NotificationEvent ev = batchIter.next(); + pos = ev.getEventId(); + return ev; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove() not supported on NotificationEventIterator"); + } + + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/d05361e5/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java index c35ca44..010f00c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java @@ -26,15 +26,12 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -99,7 +96,7 @@ public class NotificationEventPoll { } EventUtils.MSClientNotificationFetcher evFetcher - = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC()); + = new EventUtils.MSClientNotificationFetcher(Hive.get()); lastCheckedEventId = evFetcher.getCurrentNotificationEventId(); LOG.info("Initializing lastCheckedEventId to {}", lastCheckedEventId); @@ -135,7 +132,7 @@ public class NotificationEventPoll { // Get any new notification events that have been since the last time we checked, // And pass them on to the event handlers. EventUtils.MSClientNotificationFetcher evFetcher - = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC()); + = new EventUtils.MSClientNotificationFetcher(Hive.get()); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(evFetcher, lastCheckedEventId, 0, "*", null); http://git-wip-us.apache.org/repos/asf/hive/blob/d05361e5/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java deleted file mode 100644 index 2b16897..0000000 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hive.metastore.messaging; - -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; -import org.apache.thrift.TException; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -public class EventUtils { - - public interface NotificationFetcher { - int getBatchSize() throws IOException; - long getCurrentNotificationEventId() throws IOException; - long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException; - List<NotificationEvent> getNextNotificationEvents( - long pos, IMetaStoreClient.NotificationFilter filter) throws IOException; - } - - // MetaStoreClient-based impl of NotificationFetcher - public static class MSClientNotificationFetcher implements NotificationFetcher{ - - private IMetaStoreClient msc = null; - private Integer batchSize = null; - - public MSClientNotificationFetcher(IMetaStoreClient msc){ - this.msc = msc; - } - - @Override - public int getBatchSize() throws IOException { - if (batchSize == null){ - try { - batchSize = Integer.parseInt( - msc.getConfigValue(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.toString(), "50")); - // TODO: we're asking the metastore what its configuration for this var is - we may - // want to revisit to pull from client side instead. The reason I have it this way - // is because the metastore is more likely to have a reasonable config for this than - // an arbitrary client. - } catch (TException e) { - throw new IOException(e); - } - } - return batchSize; - } - - @Override - public long getCurrentNotificationEventId() throws IOException { - try { - return msc.getCurrentNotificationEventId().getEventId(); - } catch (TException e) { - throw new IOException(e); - } - } - - @Override - public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException { - try { - NotificationEventsCountRequest rqst - = new NotificationEventsCountRequest(fromEventId, dbName); - return msc.getNotificationEventsCount(rqst).getEventsCount(); - } catch (TException e) { - throw new IOException(e); - } - } - - @Override - public List<NotificationEvent> getNextNotificationEvents( - long pos, IMetaStoreClient.NotificationFilter filter) throws IOException { - try { - return msc.getNextNotification(pos,getBatchSize(), filter).getEvents(); - } catch (TException e) { - throw new IOException(e.getMessage(), e); - } - } - } - - public static class NotificationEventIterator implements Iterator<NotificationEvent> { - - private NotificationFetcher nfetcher; - private IMetaStoreClient.NotificationFilter filter; - private int maxEvents; - - private Iterator<NotificationEvent> batchIter = null; - private List<NotificationEvent> batch = null; - private long pos; - private long maxPos; - private int eventCount; - - public NotificationEventIterator( - NotificationFetcher nfetcher, long eventFrom, int maxEvents, - String dbName, String tableName) throws IOException { - init(nfetcher, eventFrom, maxEvents, new DatabaseAndTableFilter(dbName, tableName)); - // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter - // is an operation that needs to run before delegating to the other ctor, and this messes up chaining - // ctors - } - - public NotificationEventIterator( - NotificationFetcher nfetcher, long eventFrom, int maxEvents, - IMetaStoreClient.NotificationFilter filter) throws IOException { - init(nfetcher,eventFrom,maxEvents,filter); - } - - private void init( - NotificationFetcher nfetcher, long eventFrom, int maxEvents, - IMetaStoreClient.NotificationFilter filter) throws IOException { - this.nfetcher = nfetcher; - this.filter = filter; - this.pos = eventFrom; - if (maxEvents < 1){ - // 0 or -1 implies fetch everything - this.maxEvents = Integer.MAX_VALUE; - } else { - this.maxEvents = maxEvents; - } - - this.eventCount = 0; - this.maxPos = nfetcher.getCurrentNotificationEventId(); - } - - private void fetchNextBatch() throws IOException { - batch = nfetcher.getNextNotificationEvents(pos, filter); - int batchSize = nfetcher.getBatchSize(); - while ( ((batch == null) || (batch.isEmpty())) && (pos < maxPos) ){ - // no valid events this batch, but we're still not done processing events - pos += batchSize; - batch = nfetcher.getNextNotificationEvents(pos,filter); - } - - if (batch == null){ - batch = new ArrayList<>(); - // instantiate empty list so that we don't error out on iterator fetching. - // If we're here, then the next check of pos will show our caller that - // that we've exhausted our event supply - } - batchIter = batch.iterator(); - } - - @Override - public boolean hasNext() { - if (eventCount >= maxEvents){ - // If we've already satisfied the number of events we were supposed to deliver, we end it. - return false; - } - if ((batchIter != null) && (batchIter.hasNext())){ - // If we have a valid batchIter and it has more elements, return them. - return true; - } - // If we're here, we want more events, and either batchIter is null, or batchIter - // has reached the end of the current batch. Let's fetch the next batch. - try { - fetchNextBatch(); - } catch (IOException e) { - // Regrettable that we have to wrap the IOException into a RuntimeException, - // but throwing the exception is the appropriate result here, and hasNext() - // signature will only allow RuntimeExceptions. Iterator.hasNext() really - // should have allowed IOExceptions - throw new RuntimeException(e.getMessage(), e); - } - // New batch has been fetched. If it's not empty, we have more elements to process. - return !batch.isEmpty(); - } - - @Override - public NotificationEvent next() { - eventCount++; - NotificationEvent ev = batchIter.next(); - pos = ev.getEventId(); - return ev; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove() not supported on NotificationEventIterator"); - } - - } -}
