This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 66610de44b7bf316f26de28bb6eb96f6515c8439 Merge: 21fb477 8a966a0 Author: Mike Miller <[email protected]> AuthorDate: Tue Sep 12 16:46:30 2017 -0400 Merge branch '1.8' .../core/client/impl/ConditionalWriterImpl.java | 3 +- .../core/client/impl/InstanceOperationsImpl.java | 3 +- .../accumulo/core/client/impl/MasterClient.java | 3 +- .../core/client/impl/ReplicationClient.java | 3 +- .../core/client/impl/TableOperationsImpl.java | 2 +- .../impl/TabletServerBatchReaderIterator.java | 3 +- .../core/client/impl/TabletServerBatchWriter.java | 2 +- .../accumulo/core/client/impl/ThriftScanner.java | 3 +- .../core/client/impl/ThriftTransportKey.java | 2 +- .../core/client/impl/ThriftTransportPool.java | 2 +- .../apache/accumulo/core/client/impl/Writer.java | 3 +- .../core/metadata/schema/TabletMetadata.java | 2 +- .../accumulo/core/rpc/TTimeoutTransport.java | 5 +- .../org/apache/accumulo/core/rpc/ThriftUtil.java | 13 +- .../org/apache/accumulo/core/summary/Gatherer.java | 2 +- .../org/apache/accumulo/core/util/AddressUtil.java | 2 +- .../org/apache/accumulo/core/util/HostAndPort.java | 276 +++++++++++++++++++++ .../apache/accumulo/core/util/ServerServices.java | 2 +- .../core/client/impl/ThriftTransportKeyTest.java | 3 +- .../main/java/org/apache/accumulo/proxy/Proxy.java | 2 +- .../accumulo/server/client/BulkImporter.java | 3 +- .../accumulo/server/master/LiveTServerSet.java | 3 +- .../server/master/state/SuspendingTServer.java | 3 +- .../server/master/state/TServerInstance.java | 5 +- .../server/master/state/ZooTabletStateStore.java | 3 +- .../apache/accumulo/server/rpc/ServerAddress.java | 3 +- .../apache/accumulo/server/rpc/TServerUtils.java | 21 +- .../org/apache/accumulo/server/util/Admin.java | 2 +- .../apache/accumulo/server/util/LocalityCheck.java | 5 +- .../server/util/VerifyTabletAssignments.java | 2 +- .../accumulo/server/master/LiveTServerSetTest.java | 3 +- .../master/balancer/ChaoticLoadBalancerTest.java | 3 +- .../master/balancer/DefaultLoadBalancerTest.java | 3 +- .../master/balancer/TableLoadBalancerTest.java | 2 +- .../apache/accumulo/gc/SimpleGarbageCollector.java | 5 +- .../replication/CloseWriteAheadLogReferences.java | 2 +- .../apache/accumulo/master/tableOps/LoadFiles.java | 3 +- .../MasterReplicationCoordinatorTest.java | 3 +- .../master/state/RootTabletStateStoreTest.java | 3 +- .../java/org/apache/accumulo/monitor/Monitor.java | 3 +- .../apache/accumulo/monitor/ZooKeeperStatus.java | 3 +- .../monitor/rest/master/MasterResource.java | 2 +- .../accumulo/monitor/rest/scans/ScansResource.java | 3 +- .../rest/tservers/TabletServerResource.java | 5 +- .../monitor/util/AccumuloMonitorAppender.java | 5 +- .../org/apache/accumulo/monitor/view/WebViews.java | 2 +- .../org/apache/accumulo/tserver/TabletServer.java | 13 +- .../tserver/replication/AccumuloReplicaSystem.java | 3 +- .../apache/accumulo/tserver/tablet/TabletTest.java | 5 +- .../org/apache/accumulo/test/TotalQueuedIT.java | 3 +- .../org/apache/accumulo/test/WrongTabletTest.java | 2 +- .../test/functional/WatchTheWatchCountIT.java | 5 +- .../accumulo/test/functional/ZombieTServer.java | 3 +- .../apache/accumulo/test/master/MergeStateIT.java | 3 +- .../accumulo/test/master/SuspendedTabletsIT.java | 2 +- .../metadata/MetadataBatchScanTest.java | 2 +- .../test/performance/scan/CollectTabletStats.java | 4 +- .../test/performance/thrift/NullTserver.java | 2 +- .../accumulo/test/proxy/ProxyDurabilityIT.java | 2 +- .../accumulo/test/proxy/SimpleProxyBase.java | 2 +- .../test/proxy/TestProxyInstanceOperations.java | 3 +- .../test/proxy/TestProxyNamespaceOperations.java | 3 +- .../accumulo/test/proxy/TestProxyReadWrite.java | 3 +- .../test/proxy/TestProxySecurityOperations.java | 3 +- .../test/proxy/TestProxyTableOperations.java | 3 +- ...GarbageCollectorCommunicatesWithTServersIT.java | 3 +- .../replication/MultiTserverReplicationIT.java | 2 +- 67 files changed, 371 insertions(+), 136 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java index a5b72c1,353cb58..fd37cbd --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java @@@ -140,8 -126,6 +141,7 @@@ import org.slf4j.Logger import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; - import com.google.common.net.HostAndPort; public class TableOperationsImpl extends TableOperationsHelper { diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java index f3ef127,3d36e69..9d07af2 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java @@@ -39,9 -41,9 +40,8 @@@ import org.slf4j.Logger import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; - import com.google.common.net.HostAndPort; public class ThriftTransportPool { - private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission"); private static final Random random = new Random(); private long killTime = 1000 * 3; diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index f0c758f,0000000..25efcb0 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@@ -1,183 -1,0 +1,183 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.metadata.schema; + +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; + +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; + +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.impl.Table; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; ++import org.apache.accumulo.core.util.HostAndPort; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.collect.Iterators; - import com.google.common.net.HostAndPort; + +public class TabletMetadata { + + private Table.ID tableId; + private Text prevEndRow; + private Text endRow; + private Location location; + private List<String> files; + private EnumSet<FetchedColumns> fetchedColumns; + private KeyExtent extent; + private Location last; + + public static enum LocationType { + CURRENT, FUTURE, LAST + } + + public static enum FetchedColumns { + LOCATION, PREV_ROW, FILES, LAST + } + + public static class Location { + private final String server; + private final String session; + private final LocationType lt; + + Location(String server, String session, LocationType lt) { + this.server = server; + this.session = session; + this.lt = lt; + } + + public HostAndPort getHostAndPort() { + return HostAndPort.fromString(server); + } + + public String getSession() { + return session; + } + + public LocationType getLocationType() { + return lt; + } + } + + public Table.ID getTableId() { + return tableId; + } + + public KeyExtent getExtent() { + if (extent == null) { + extent = new KeyExtent(getTableId(), getEndRow(), getPrevEndRow()); + } + return extent; + } + + public Text getPrevEndRow() { + Preconditions.checkState(fetchedColumns.contains(FetchedColumns.PREV_ROW), "Requested prev row when it was not fetched"); + return prevEndRow; + } + + public Text getEndRow() { + return endRow; + } + + public Location getLocation() { + Preconditions.checkState(fetchedColumns.contains(FetchedColumns.LOCATION), "Requested location when it was not fetched"); + return location; + } + + public Location getLast() { + Preconditions.checkState(fetchedColumns.contains(FetchedColumns.LAST), "Requested last when it was not fetched"); + return last; + } + + public List<String> getFiles() { + Preconditions.checkState(fetchedColumns.contains(FetchedColumns.FILES), "Requested files when it was not fetched"); + return files; + } + + public static TabletMetadata convertRow(Iterator<Entry<Key,Value>> rowIter, EnumSet<FetchedColumns> fetchedColumns) { + Objects.requireNonNull(rowIter); + + TabletMetadata te = new TabletMetadata(); + + Builder<String> filesBuilder = ImmutableList.builder(); + ByteSequence row = null; + + while (rowIter.hasNext()) { + Entry<Key,Value> kv = rowIter.next(); + Key k = kv.getKey(); + Value v = kv.getValue(); + Text fam = k.getColumnFamily(); + + if (row == null) { + row = k.getRowData(); + KeyExtent ke = new KeyExtent(k.getRow(), (Text) null); + te.endRow = ke.getEndRow(); + te.tableId = ke.getTableId(); + } else if (!row.equals(k.getRowData())) { + throw new IllegalArgumentException("Input contains more than one row : " + row + " " + k.getRowData()); + } + + if (PREV_ROW_COLUMN.hasColumns(k)) { + te.prevEndRow = KeyExtent.decodePrevEndRow(v); + } + + if (fam.equals(DataFileColumnFamily.NAME)) { + filesBuilder.add(k.getColumnQualifier().toString()); + } else if (fam.equals(CurrentLocationColumnFamily.NAME)) { + if (te.location != null) { + throw new IllegalArgumentException("Input contains more than one location " + te.location + " " + v); + } + te.location = new Location(v.toString(), k.getColumnQualifierData().toString(), LocationType.CURRENT); + } else if (fam.equals(FutureLocationColumnFamily.NAME)) { + if (te.location != null) { + throw new IllegalArgumentException("Input contains more than one location " + te.location + " " + v); + } + te.location = new Location(v.toString(), k.getColumnQualifierData().toString(), LocationType.FUTURE); + } else if (fam.equals(LastLocationColumnFamily.NAME)) { + te.last = new Location(v.toString(), k.getColumnQualifierData().toString(), LocationType.LAST); + } + } + + te.files = filesBuilder.build(); + te.fetchedColumns = fetchedColumns; + return te; + } + + public static Iterable<TabletMetadata> convert(Scanner input, EnumSet<FetchedColumns> fetchedColumns) { + return new Iterable<TabletMetadata>() { + @Override + public Iterator<TabletMetadata> iterator() { + RowIterator rowIter = new RowIterator(input); + return Iterators.transform(rowIter, ri -> convertRow(ri, fetchedColumns)); + } + }; + } +} diff --cc core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java index 809975f,e93d457..7dc079c --- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java @@@ -30,31 -31,14 +31,29 @@@ import org.apache.accumulo.core.util.Ho import org.apache.hadoop.net.NetUtils; import org.apache.thrift.transport.TIOStreamTransport; import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - import com.google.common.net.HostAndPort; - +/** + * A utility class for setting up a {@link TTransport} with various necessary configurations for ideal performance in Accumulo. These configurations include: + * <ul> + * <li>Setting SO_LINGER=false on the socket.</li> + * <li>Setting TCP_NO_DELAY=true on the socket.</li> + * <li>Setting timeouts on the I/OStreams.</li> + * </ul> + */ public class TTimeoutTransport { + private static final Logger log = LoggerFactory.getLogger(TTimeoutTransport.class); + + private static final TTimeoutTransport INSTANCE = new TTimeoutTransport(); + + private volatile Method GET_INPUT_STREAM_METHOD = null; - private static volatile Method GET_INPUT_STREAM_METHOD = null; + private TTimeoutTransport() {} - private static Method getNetUtilsInputStreamMethod() { + private Method getNetUtilsInputStreamMethod() { if (null == GET_INPUT_STREAM_METHOD) { - synchronized (TTimeoutTransport.class) { + synchronized (this) { if (null == GET_INPUT_STREAM_METHOD) { try { GET_INPUT_STREAM_METHOD = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE); @@@ -96,96 -60,14 +95,96 @@@ } } + /** + * Creates a Thrift TTransport to the given address with the given timeout. All created resources are closed if an exception is thrown. + * + * @param addr + * The address to connect the client to + * @param timeoutMillis + * The timeout in milliseconds for the connection + * @return A TTransport connected to the given <code>addr</code> + * @throws IOException + * If the transport fails to be created/connected + */ public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException { - return INSTANCE.createInternal(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis); - return create(new InetSocketAddress(addr.getHost(), addr.getPort()), timeoutMillis); ++ return INSTANCE.createInternal(new InetSocketAddress(addr.getHost(), addr.getPort()), timeoutMillis); } + /** + * Creates a Thrift TTransport to the given address with the given timeout. All created resources are closed if an exception is thrown. + * + * @param addr + * The address to connect the client to + * @param timeoutMillis + * The timeout in milliseconds for the connection + * @return A TTransport connected to the given <code>addr</code> + * @throws IOException + * If the transport fails to be created/connected + */ public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException { + return INSTANCE.createInternal(addr, timeoutMillis); + } + + /** + * Opens a socket to the given <code>addr</code>, configures the socket, and then creates a Thrift transport using the socket. + * + * @param addr + * The address the socket should connect + * @param timeoutMillis + * The socket timeout in milliseconds + * @return A TTransport instance to the given <code>addr</code> + * @throws IOException + * If the Thrift client is failed to be connected/created + */ + protected TTransport createInternal(SocketAddress addr, long timeoutMillis) throws IOException { Socket socket = null; try { - socket = SelectorProvider.provider().openSocketChannel().socket(); + socket = openSocket(addr); + } catch (IOException e) { + // openSocket handles closing the Socket on error + throw e; + } + + // Should be non-null + assert null != socket; + + // Set up the streams + try { + InputStream input = wrapInputStream(socket, timeoutMillis); + OutputStream output = wrapOutputStream(socket, timeoutMillis); + return new TIOStreamTransport(input, output); + } catch (IOException e) { + try { + socket.close(); + } catch (IOException ioe) { + log.error("Failed to close socket after unsuccessful I/O stream setup", e); + } + + throw e; + } + } + + // Visible for testing + protected InputStream wrapInputStream(Socket socket, long timeoutMillis) throws IOException { + return new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10); + } + + // Visible for testing + protected OutputStream wrapOutputStream(Socket socket, long timeoutMillis) throws IOException { + return new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10); + } + + /** + * Opens and configures a {@link Socket} for Accumulo RPC. + * + * @param addr + * The address to connect the socket to + * @return A socket connected to the given address, or null if the socket fails to connect + */ + protected Socket openSocket(SocketAddress addr) throws IOException { + Socket socket = null; + try { + socket = openSocketChannel(); socket.setSoLinger(false, 0); socket.setTcpNoDelay(true); socket.connect(addr); diff --cc core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java index 45edd1a,0000000..a5fef9b mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java @@@ -1,632 -1,0 +1,632 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.summary; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.ServerClient; +import org.apache.accumulo.core.client.impl.Table; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.data.thrift.TRowRange; +import org.apache.accumulo.core.data.thrift.TSummaries; +import org.apache.accumulo.core.data.thrift.TSummaryRequest; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.metadata.schema.MetadataScanner; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.CancelFlagFuture; +import org.apache.accumulo.core.util.CompletableFutureUtil; ++import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; - import com.google.common.net.HostAndPort; + +/** + * This class implements using multiple tservers to gather summaries. + * + * Below is a rough outline of the RPC process. + * + * <ol> + * <li>Clients pick a random tserver and make an RPC to remotely execute {@link #gather(ExecutorService)}. + * <li> {@link #gather(ExecutorService)} will call make RPC calls to multiple tservers to remotely execute {@link #processPartition(ExecutorService, int, int)} + * <li> {@link #processPartition(ExecutorService, int, int)} will make RPC calls to multiple tserver to remotely execute + * <li> {@link #processFiles(FileSystemResolver, Map, BlockCache, BlockCache, ExecutorService)} + * </ol> + */ +public class Gatherer { + + private static final Logger log = LoggerFactory.getLogger(Gatherer.class); + + private ClientContext ctx; + private Table.ID tableId; + private SummarizerFactory factory; + private Text startRow = null; + private Text endRow = null; + private Range clipRange; + private Predicate<SummarizerConfiguration> summarySelector; + + private TSummaryRequest request; + + private String summarizerPattern; + + private Set<SummarizerConfiguration> summaries; + + public Gatherer(ClientContext context, TSummaryRequest request, AccumuloConfiguration tableConfig) { + this.ctx = context; + this.tableId = Table.ID.of(request.tableId); + this.startRow = ByteBufferUtil.toText(request.bounds.startRow); + this.endRow = ByteBufferUtil.toText(request.bounds.endRow); + this.clipRange = new Range(startRow, false, endRow, true); + this.summaries = request.getSummarizers().stream().map(SummarizerConfigurationUtil::fromThrift).collect(Collectors.toSet()); + this.request = request; + + this.summarizerPattern = request.getSummarizerPattern(); + + if (summarizerPattern != null) { + Pattern pattern = Pattern.compile(summarizerPattern); + // The way conf is converted to string below is documented in the API, so consider this when making changes! + summarySelector = conf -> pattern.matcher(conf.getClassName() + " " + new TreeMap<>(conf.getOptions())).matches(); + if (!summaries.isEmpty()) { + summarySelector = summarySelector.or(conf -> summaries.contains(conf)); + } + } else if (!summaries.isEmpty()) { + summarySelector = conf -> summaries.contains(conf); + } else { + summarySelector = conf -> true; + } + + this.factory = new SummarizerFactory(tableConfig); + } + + private TSummaryRequest getRequest() { + return request; + } + + /** + * @param fileSelector + * only returns files that match this predicate + * @return A map of the form : {@code map<tserver location, map<path, list<range>>} . The ranges associated with a file represent the tablets that use the + * file. + */ + private Map<String,Map<String,List<TRowRange>>> getFilesGroupedByLocation(Predicate<String> fileSelector) throws TableNotFoundException, AccumuloException, + AccumuloSecurityException { + + Iterable<TabletMetadata> tmi = MetadataScanner.builder().from(ctx).overUserTableId(tableId, startRow, endRow).fetchFiles().fetchLocation().fetchLast() + .fetchPrev().build(); + + // get a subset of files + Map<String,List<TabletMetadata>> files = new HashMap<>(); + for (TabletMetadata tm : tmi) { + for (String file : tm.getFiles()) { + if (fileSelector.test(file)) { + // TODO push this filtering to server side and possibly use batch scanner + files.computeIfAbsent(file, s -> new ArrayList<>()).add(tm); + } + } + } + + // group by location, then file + + Map<String,Map<String,List<TRowRange>>> locations = new HashMap<>(); + + List<String> tservers = null; + + for (Entry<String,List<TabletMetadata>> entry : files.entrySet()) { + + String location = entry.getValue().stream().filter(tm -> tm.getLocation() != null) // filter tablets w/o a location + .map(tm -> tm.getLocation().getHostAndPort().toString()) // convert to host:port strings + .min(String::compareTo) // find minimum host:port + .orElse(entry.getValue().stream().filter(tm -> tm.getLast() != null) // if no locations, then look at last locations + .map(tm -> tm.getLast().getHostAndPort().toString()) // convert to host:port strings + .min(String::compareTo).orElse(null)); // find minimum last location or return null + + if (location == null) { + if (tservers == null) { + tservers = ctx.getConnector().instanceOperations().getTabletServers(); + Collections.sort(tservers); + } + + // When no location, the approach below will consistently choose the same tserver for the same file (as long as the set of tservers is stable). + int idx = Math.abs(Hashing.murmur3_32().hashString(entry.getKey()).asInt()) % tservers.size(); + location = tservers.get(idx); + } + + List<Range> merged = Range.mergeOverlapping(Lists.transform(entry.getValue(), tm -> tm.getExtent().toDataRange())); // merge contiguous ranges + List<TRowRange> ranges = merged.stream().map(r -> toClippedExtent(r).toThrift()).collect(Collectors.toList()); // clip ranges to queried range + + locations.computeIfAbsent(location, s -> new HashMap<>()).put(entry.getKey(), ranges); + } + + return locations; + } + + private <K,V> Iterable<Map<K,V>> partition(Map<K,V> map, int max) { + + if (map.size() < max) { + return Collections.singletonList(map); + } + + return new Iterable<Map<K,V>>() { + @Override + public Iterator<Map<K,V>> iterator() { + Iterator<Entry<K,V>> esi = map.entrySet().iterator(); + + return new Iterator<Map<K,V>>() { + @Override + public boolean hasNext() { + return esi.hasNext(); + } + + @Override + public Map<K,V> next() { + Map<K,V> workingMap = new HashMap<>(max); + while (esi.hasNext() && workingMap.size() < max) { + Entry<K,V> entry = esi.next(); + workingMap.put(entry.getKey(), entry.getValue()); + } + return workingMap; + } + }; + } + }; + } + + private static class ProcessedFiles { + final SummaryCollection summaries; + final Set<String> failedFiles; + + public ProcessedFiles() { + this.summaries = new SummaryCollection(); + this.failedFiles = new HashSet<>(); + } + + public ProcessedFiles(SummaryCollection summaries, SummarizerFactory factory) { + this(); + this.summaries.merge(summaries, factory); + } + + static ProcessedFiles merge(ProcessedFiles pf1, ProcessedFiles pf2, SummarizerFactory factory) { + ProcessedFiles ret = new ProcessedFiles(); + ret.failedFiles.addAll(pf1.failedFiles); + ret.failedFiles.addAll(pf2.failedFiles); + ret.summaries.merge(pf1.summaries, factory); + ret.summaries.merge(pf2.summaries, factory); + return ret; + } + } + + private class FilesProcessor implements Supplier<ProcessedFiles> { + + HostAndPort location; + Map<String,List<TRowRange>> allFiles; + private TInfo tinfo; + private AtomicBoolean cancelFlag; + + public FilesProcessor(TInfo tinfo, HostAndPort location, Map<String,List<TRowRange>> allFiles, AtomicBoolean cancelFlag) { + this.location = location; + this.allFiles = allFiles; + this.tinfo = tinfo; + this.cancelFlag = cancelFlag; + } + + @Override + public ProcessedFiles get() { + ProcessedFiles pfiles = new ProcessedFiles(); + + Client client = null; + try { + client = ThriftUtil.getTServerClient(location, ctx); + // partition files into smaller chunks so that not too many are sent to a tserver at once + for (Map<String,List<TRowRange>> files : partition(allFiles, 500)) { + if (pfiles.failedFiles.size() > 0) { + // there was a previous failure on this tserver, so just fail the rest of the files + pfiles.failedFiles.addAll(files.keySet()); + continue; + } + + try { + TSummaries tSums = client.startGetSummariesFromFiles(tinfo, ctx.rpcCreds(), getRequest(), files); + while (!tSums.finished && !cancelFlag.get()) { + tSums = client.contiuneGetSummaries(tinfo, tSums.sessionId); + } + + pfiles.summaries.merge(new SummaryCollection(tSums), factory); + } catch (TApplicationException tae) { + throw new RuntimeException(tae); + } catch (TTransportException e) { + pfiles.failedFiles.addAll(files.keySet()); + continue; + } catch (TException e) { + throw new RuntimeException(e); + } + + } + } catch (TTransportException e1) { + pfiles.failedFiles.addAll(allFiles.keySet()); + } finally { + ThriftUtil.returnClient(client); + } + + if (cancelFlag.get()) { + throw new RuntimeException("Operation canceled"); + } + + return pfiles; + } + } + + private class PartitionFuture implements Future<SummaryCollection> { + + private CompletableFuture<ProcessedFiles> future; + private int modulus; + private int remainder; + private ExecutorService execSrv; + private TInfo tinfo; + private AtomicBoolean cancelFlag = new AtomicBoolean(false); + + PartitionFuture(TInfo tinfo, ExecutorService execSrv, int modulus, int remainder) { + this.tinfo = tinfo; + this.execSrv = execSrv; + this.modulus = modulus; + this.remainder = remainder; + } + + private synchronized void initiateProcessing(ProcessedFiles previousWork) { + try { + Predicate<String> fileSelector = file -> Math.abs(Hashing.murmur3_32().hashString(file).asInt()) % modulus == remainder; + if (previousWork != null) { + fileSelector = fileSelector.and(file -> previousWork.failedFiles.contains(file)); + } + Map<String,Map<String,List<TRowRange>>> filesGBL; + filesGBL = getFilesGroupedByLocation(fileSelector); + + List<CompletableFuture<ProcessedFiles>> futures = new ArrayList<>(); + if (previousWork != null) { + futures.add(CompletableFuture.completedFuture(new ProcessedFiles(previousWork.summaries, factory))); + } + + for (Entry<String,Map<String,List<TRowRange>>> entry : filesGBL.entrySet()) { + HostAndPort location = HostAndPort.fromString(entry.getKey()); + Map<String,List<TRowRange>> allFiles = entry.getValue(); + + futures.add(CompletableFuture.supplyAsync(new FilesProcessor(tinfo, location, allFiles, cancelFlag), execSrv)); + } + + future = CompletableFutureUtil.merge(futures, (pf1, pf2) -> ProcessedFiles.merge(pf1, pf2, factory), ProcessedFiles::new); + + // when all processing is done, check for failed files... and if found starting processing again + future.thenRun(() -> updateFuture()); + } catch (Exception e) { + future = CompletableFuture.completedFuture(new ProcessedFiles()); + // force future to have this exception + future.obtrudeException(e); + } + } + + private ProcessedFiles _get() { + try { + return future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + private synchronized CompletableFuture<ProcessedFiles> updateFuture() { + if (future.isDone()) { + if (!future.isCancelled() && !future.isCompletedExceptionally()) { + ProcessedFiles pf = _get(); + if (pf.failedFiles.size() > 0) { + initiateProcessing(pf); + } + } + } + + return future; + } + + synchronized void initiateProcessing() { + Preconditions.checkState(future == null); + initiateProcessing(null); + } + + @Override + public synchronized boolean cancel(boolean mayInterruptIfRunning) { + boolean canceled = future.cancel(mayInterruptIfRunning); + if (canceled) { + cancelFlag.set(true); + } + return canceled; + } + + @Override + public synchronized boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public synchronized boolean isDone() { + updateFuture(); + if (future.isDone()) { + if (future.isCancelled() || future.isCompletedExceptionally()) { + return true; + } + + ProcessedFiles pf = _get(); + if (pf.failedFiles.size() == 0) { + return true; + } else { + updateFuture(); + } + } + + return false; + } + + @Override + public SummaryCollection get() throws InterruptedException, ExecutionException { + CompletableFuture<ProcessedFiles> futureRef = updateFuture(); + ProcessedFiles processedFiles = futureRef.get(); + while (processedFiles.failedFiles.size() > 0) { + futureRef = updateFuture(); + processedFiles = futureRef.get(); + } + return processedFiles.summaries; + } + + @Override + public SummaryCollection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long nanosLeft = unit.toNanos(timeout); + long t1, t2; + CompletableFuture<ProcessedFiles> futureRef = updateFuture(); + t1 = System.nanoTime(); + ProcessedFiles processedFiles = futureRef.get(Long.max(1, nanosLeft), TimeUnit.NANOSECONDS); + t2 = System.nanoTime(); + nanosLeft -= (t2 - t1); + while (processedFiles.failedFiles.size() > 0) { + futureRef = updateFuture(); + t1 = System.nanoTime(); + processedFiles = futureRef.get(Long.max(1, nanosLeft), TimeUnit.NANOSECONDS); + t2 = System.nanoTime(); + nanosLeft -= (t2 - t1); + } + return processedFiles.summaries; + } + + } + + /** + * This methods reads a subset of file paths into memory and groups them by location. Then it request sumaries for files from each location/tablet server. + */ + public Future<SummaryCollection> processPartition(ExecutorService execSrv, int modulus, int remainder) { + PartitionFuture future = new PartitionFuture(Tracer.traceInfo(), execSrv, modulus, remainder); + future.initiateProcessing(); + return future; + } + + public static interface FileSystemResolver { + FileSystem get(Path file); + } + + /** + * This method will read summaries from a set of files. + */ + public Future<SummaryCollection> processFiles(FileSystemResolver volMgr, Map<String,List<TRowRange>> files, BlockCache summaryCache, BlockCache indexCache, + ExecutorService srp) { + List<CompletableFuture<SummaryCollection>> futures = new ArrayList<>(); + for (Entry<String,List<TRowRange>> entry : files.entrySet()) { + futures.add(CompletableFuture.supplyAsync(() -> { + List<RowRange> rrl = Lists.transform(entry.getValue(), RowRange::new); + return getSummaries(volMgr, entry.getKey(), rrl, summaryCache, indexCache); + }, srp)); + } + + return CompletableFutureUtil.merge(futures, (sc1, sc2) -> SummaryCollection.merge(sc1, sc2, factory), SummaryCollection::new); + } + + private int countFiles() throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + // TODO use a batch scanner + iterator to parallelize counting files + Iterable<TabletMetadata> tmi = MetadataScanner.builder().from(ctx).overUserTableId(tableId, startRow, endRow).fetchFiles().fetchPrev().build(); + return StreamSupport.stream(tmi.spliterator(), false).mapToInt(tm -> tm.getFiles().size()).sum(); + } + + private class GatherRequest implements Supplier<SummaryCollection> { + + private int remainder; + private int modulus; + private TInfo tinfo; + private AtomicBoolean cancelFlag; + + GatherRequest(TInfo tinfo, int remainder, int modulus, AtomicBoolean cancelFlag) { + this.remainder = remainder; + this.modulus = modulus; + this.tinfo = tinfo; + this.cancelFlag = cancelFlag; + } + + @Override + public SummaryCollection get() { + TSummaryRequest req = getRequest(); + + TSummaries tSums; + try { + tSums = ServerClient.execute(ctx, new TabletClientService.Client.Factory(), client -> { + TSummaries tsr = client.startGetSummariesForPartition(tinfo, ctx.rpcCreds(), req, modulus, remainder); + while (!tsr.finished && !cancelFlag.get()) { + tsr = client.contiuneGetSummaries(tinfo, tsr.sessionId); + } + return tsr; + }); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + + if (cancelFlag.get()) { + throw new RuntimeException("Operation canceled"); + } + + return new SummaryCollection(tSums); + } + } + + public Future<SummaryCollection> gather(ExecutorService es) { + int numFiles; + try { + numFiles = countFiles(); + } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + + log.debug("Gathering summaries from {} files", numFiles); + + if (numFiles == 0) { + return CompletableFuture.completedFuture(new SummaryCollection()); + } + + // have each tablet server process ~100K files + int numRequest = Math.max(numFiles / 100_000, 1); + + List<CompletableFuture<SummaryCollection>> futures = new ArrayList<>(); + + AtomicBoolean cancelFlag = new AtomicBoolean(false); + + TInfo tinfo = Tracer.traceInfo(); + for (int i = 0; i < numRequest; i++) { + futures.add(CompletableFuture.supplyAsync(new GatherRequest(tinfo, i, numRequest, cancelFlag), es)); + } + + Future<SummaryCollection> future = CompletableFutureUtil.merge(futures, (sc1, sc2) -> SummaryCollection.merge(sc1, sc2, factory), SummaryCollection::new); + return new CancelFlagFuture<>(future, cancelFlag); + } + + private static Text removeTrailingZeroFromRow(Key k) { + if (k != null) { + Text t = new Text(); + ByteSequence row = k.getRowData(); + Preconditions.checkArgument(row.length() >= 1 && row.byteAt(row.length() - 1) == 0); + t.set(row.getBackingArray(), row.offset(), row.length() - 1); + return t; + } else { + return null; + } + } + + private RowRange toClippedExtent(Range r) { + r = clipRange.clip(r); + + Text startRow = removeTrailingZeroFromRow(r.getStartKey()); + Text endRow = removeTrailingZeroFromRow(r.getEndKey()); + + return new RowRange(startRow, endRow); + } + + public static class RowRange { + private Text startRow; + private Text endRow; + + public RowRange(KeyExtent ke) { + this.startRow = ke.getPrevEndRow(); + this.endRow = ke.getEndRow(); + } + + public RowRange(TRowRange trr) { + this.startRow = ByteBufferUtil.toText(trr.startRow); + this.endRow = ByteBufferUtil.toText(trr.endRow); + } + + public RowRange(Text startRow, Text endRow) { + this.startRow = startRow; + this.endRow = endRow; + } + + public Range toRange() { + return new Range(startRow, false, endRow, true); + } + + public TRowRange toThrift() { + return new TRowRange(TextUtil.getByteBuffer(startRow), TextUtil.getByteBuffer(endRow)); + } + + public Text getStartRow() { + return startRow; + } + + public Text getEndRow() { + return endRow; + } + + public String toString() { + return startRow + " " + endRow; + } + } + + private SummaryCollection getSummaries(FileSystemResolver volMgr, String file, List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache) { + Path path = new Path(file); + Configuration conf = CachedConfiguration.getInstance(); + return SummaryReader.load(volMgr.get(path), conf, ctx.getConfiguration(), factory, path, summarySelector, summaryCache, indexCache).getSummaries(ranges); + } +} diff --cc core/src/main/java/org/apache/accumulo/core/util/HostAndPort.java index 0000000,59cb652..d01469d mode 000000,100644..100644 --- a/core/src/main/java/org/apache/accumulo/core/util/HostAndPort.java +++ b/core/src/main/java/org/apache/accumulo/core/util/HostAndPort.java @@@ -1,0 -1,268 +1,276 @@@ + /* + * Copyright (C) 2011 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + + package org.apache.accumulo.core.util; + + import static com.google.common.base.Preconditions.checkArgument; + import static com.google.common.base.Preconditions.checkState; + + import java.io.Serializable; + + import com.google.common.base.Strings; + + /** + * This class was copied from Guava release 23.0 to replace the older Guava 14 version that had been used in Accumulo. It was annotated as Beta by Google, + * therefore unstable to use in a core Accumulo library. We learned this the hard way when Guava version 20 deprecated the getHostText method and then removed + * the method all together in version 22. See ACCUMULO-4702 + * + * Unused methods and annotations were removed to reduce maintenance costs. + * + * Javadoc from Guava 23.0 release: An immutable representation of a host and port. + * + * <p> + * Example usage: + * + * <pre> + * HostAndPort hp = HostAndPort.fromString("[2001:db8::1]").withDefaultPort(80).requireBracketsForIPv6(); + * hp.getHost(); // returns "2001:db8::1" + * hp.getPort(); // returns 80 + * hp.toString(); // returns "[2001:db8::1]:80" + * </pre> + * + * <p> + * Here are some examples of recognized formats: + * <ul> + * <li>example.com + * <li>example.com:80 + * <li>192.0.2.1 + * <li>192.0.2.1:80 + * <li>[2001:db8::1] - {@link #getHost()} omits brackets + * <li>[2001:db8::1]:80 - {@link #getHost()} omits brackets + * <li>2001:db8::1 - Use requireBracketsForIPv6() to prohibit this + * </ul> + * + * <p> + * Note that this is not an exhaustive list, because these methods are only concerned with brackets, colons, and port numbers. Full validation of the host field + * (if desired) is the caller's responsibility. + * + * @author Paul Marks + * @since 10.0 + */ + + public final class HostAndPort implements Serializable { + /** Magic value indicating the absence of a port number. */ + private static final int NO_PORT = -1; + + /** Hostname, IPv4/IPv6 literal, or unvalidated nonsense. */ + private final String host; + + /** Validated port number in the range [0..65535], or NO_PORT */ + private final int port; + + /** True if the parsed host has colons, but no surrounding brackets. */ + private final boolean hasBracketlessColons; + + private HostAndPort(String host, int port, boolean hasBracketlessColons) { + this.host = host; + this.port = port; + this.hasBracketlessColons = hasBracketlessColons; + } + + /** + * Returns the portion of this {@code HostAndPort} instance that should represent the hostname or IPv4/IPv6 literal. + * + * <p> + * A successful parse does not imply any degree of sanity in this field. For additional validation, see the HostSpecifier class. + * + * @since 20.0 (since 10.0 as {@code getHostText}) + */ + public String getHost() { + return host; + } + + /** Return true if this instance has a defined port. */ + public boolean hasPort() { + return port >= 0; + } + + /** + * Get the current port number, failing if no port is defined. + * + * @return a validated port number, in the range [0..65535] + * @throws IllegalStateException + * if no port is defined. You can use {@link #withDefaultPort(int)} to prevent this from occurring. + */ + public int getPort() { + checkState(hasPort()); + return port; + } + + /** + * Build a HostAndPort instance from separate host and port values. + * + * <p> + * Note: Non-bracketed IPv6 literals are allowed. Use #requireBracketsForIPv6() to prohibit these. + * + * @param host + * the host string to parse. Must not contain a port number. + * @param port + * a port number from [0..65535] + * @return if parsing was successful, a populated HostAndPort object. + * @throws IllegalArgumentException + * if {@code host} contains a port number, or {@code port} is out of range. + */ + public static HostAndPort fromParts(String host, int port) { + checkArgument(isValidPort(port), "Port out of range: %s", port); + HostAndPort parsedHost = fromString(host); + checkArgument(!parsedHost.hasPort(), "Host has a port: %s", host); + return new HostAndPort(parsedHost.host, port, parsedHost.hasBracketlessColons); + } + + /** + * Split a freeform string into a host and port, without strict validation. + * + * Note that the host-only formats will leave the port field undefined. You can use {@link #withDefaultPort(int)} to patch in a default value. + * + * @param hostPortString + * the input string to parse. + * @return if parsing was successful, a populated HostAndPort object. + * @throws IllegalArgumentException + * if nothing meaningful could be parsed. + */ + public static HostAndPort fromString(String hostPortString) { + java.util.Objects.requireNonNull(hostPortString); + String host; + String portString = null; + boolean hasBracketlessColons = false; + + if (hostPortString.startsWith("[")) { + String[] hostAndPort = getHostAndPortFromBracketedHost(hostPortString); + host = hostAndPort[0]; + portString = hostAndPort[1]; + } else { + int colonPos = hostPortString.indexOf(':'); + if (colonPos >= 0 && hostPortString.indexOf(':', colonPos + 1) == -1) { + // Exactly 1 colon. Split into host:port. + host = hostPortString.substring(0, colonPos); + portString = hostPortString.substring(colonPos + 1); + } else { + // 0 or 2+ colons. Bare hostname or IPv6 literal. + host = hostPortString; + hasBracketlessColons = (colonPos >= 0); + } + } + + int port = NO_PORT; + if (!Strings.isNullOrEmpty(portString)) { + // Try to parse the whole port string as a number. + // JDK7 accepts leading plus signs. We don't want to. + checkArgument(!portString.startsWith("+"), "Unparseable port number: %s", hostPortString); + try { + port = Integer.parseInt(portString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Unparseable port number: " + hostPortString); + } + checkArgument(isValidPort(port), "Port number out of range: %s", hostPortString); + } + + return new HostAndPort(host, port, hasBracketlessColons); + } + + /** + * Parses a bracketed host-port string, throwing IllegalArgumentException if parsing fails. + * + * @param hostPortString + * the full bracketed host-port specification. Post might not be specified. + * @return an array with 2 strings: host and port, in that order. + * @throws IllegalArgumentException + * if parsing the bracketed host-port string fails. + */ + private static String[] getHostAndPortFromBracketedHost(String hostPortString) { + int colonIndex = 0; + int closeBracketIndex = 0; + checkArgument(hostPortString.charAt(0) == '[', "Bracketed host-port string must start with a bracket: %s", hostPortString); + colonIndex = hostPortString.indexOf(':'); + closeBracketIndex = hostPortString.lastIndexOf(']'); + checkArgument(colonIndex > -1 && closeBracketIndex > colonIndex, "Invalid bracketed host/port: %s", hostPortString); + + String host = hostPortString.substring(1, closeBracketIndex); + if (closeBracketIndex + 1 == hostPortString.length()) { + return new String[] {host, ""}; + } else { + checkArgument(hostPortString.charAt(closeBracketIndex + 1) == ':', "Only a colon may follow a close bracket: %s", hostPortString); + for (int i = closeBracketIndex + 2; i < hostPortString.length(); ++i) { + checkArgument(Character.isDigit(hostPortString.charAt(i)), "Port must be numeric: %s", hostPortString); + } + return new String[] {host, hostPortString.substring(closeBracketIndex + 2)}; + } + } + + /** + * Provide a default port if the parsed string contained only a host. + * + * You can chain this after {@link #fromString(String)} to include a port in case the port was omitted from the input string. If a port was already provided, + * then this method is a no-op. + * + * @param defaultPort + * a port number, from [0..65535] + * @return a HostAndPort instance, guaranteed to have a defined port. + */ + public HostAndPort withDefaultPort(int defaultPort) { + checkArgument(isValidPort(defaultPort)); + if (hasPort() || port == defaultPort) { + return this; + } + return new HostAndPort(host, defaultPort, hasBracketlessColons); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other instanceof HostAndPort) { + HostAndPort that = (HostAndPort) other; + return java.util.Objects.equals(this.host, that.host) && this.port == that.port && this.hasBracketlessColons == that.hasBracketlessColons; + } + return false; + } + + @Override + public int hashCode() { + return java.util.Objects.hash(host, port, hasBracketlessColons); + } + + /** Rebuild the host:port string, including brackets if necessary. */ + @Override + public String toString() { + // "[]:12345" requires 8 extra bytes. + StringBuilder builder = new StringBuilder(host.length() + 8); + if (host.indexOf(':') >= 0) { + builder.append('[').append(host).append(']'); + } else { + builder.append(host); + } + if (hasPort()) { + builder.append(':').append(port); + } + return builder.toString(); + } + + /** Return true for valid port numbers. */ + private static boolean isValidPort(int port) { + return port >= 0 && port <= 65535; + } + + private static final long serialVersionUID = 0; ++ ++ /** ++ * Returns the current port number, with a default if no port is defined. ++ */ ++ public int getPortOrDefault(int defaultPort) { ++ return hasPort() ? port : defaultPort; ++ } ++ + } diff --cc proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java index fcbef6b,54fd03b..0924920 --- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java @@@ -27,9 -27,10 +27,10 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; import org.apache.accumulo.core.client.impl.ClientContext; import org.apache.accumulo.core.client.security.tokens.KerberosToken; -import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.rpc.SslConnectionParams; + import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.accumulo.proxy.thrift.AccumuloProxy; import org.apache.accumulo.server.metrics.MetricsFactory; diff --cc server/base/src/test/java/org/apache/accumulo/server/master/balancer/TableLoadBalancerTest.java index 24fed0c,95ece35..012b846 --- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/TableLoadBalancerTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/TableLoadBalancerTest.java @@@ -36,8 -35,7 +36,9 @@@ import org.apache.accumulo.core.data.im import org.apache.accumulo.core.master.thrift.TableInfo; import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; + import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.conf.NamespaceConfiguration; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.master.state.TServerInstance; diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index feafa1c,eef856c..7c9498d --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -21,8 -21,10 +21,9 @@@ import static com.google.common.util.co import java.io.FileNotFoundException; import java.io.IOException; import java.net.UnknownHostException; + import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; @@@ -109,14 -111,11 +111,13 @@@ import org.slf4j.Logger import org.slf4j.LoggerFactory; import com.beust.jcommander.Parameter; -import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Maps; - import com.google.common.net.HostAndPort; import com.google.protobuf.InvalidProtocolBufferException; +// Could/Should implement HighlyAvaialbleService but the Thrift server is already started before +// the ZK lock is acquired. The server is only for metrics, there are no concerns about clients +// using the service before the lock is acquired. public class SimpleGarbageCollector extends AccumuloServerContext implements Iface { private static final Text EMPTY_TEXT = new Text(); diff --cc server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java index cbd921f,106a542..1198815 --- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java @@@ -27,9 -27,9 +27,10 @@@ import java.util.Collections import java.util.HashSet; import java.util.List; +import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.metadata.RootTable; + import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.server.master.state.Assignment; import org.apache.accumulo.server.master.state.DistributedStore; import org.apache.accumulo.server.master.state.DistributedStoreException; diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/rest/master/MasterResource.java index 4079f24,0000000..445e104 mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/master/MasterResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/master/MasterResource.java @@@ -1,238 -1,0 +1,238 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.rest.master; + +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.apache.accumulo.core.gc.thrift.GCStatus; +import org.apache.accumulo.core.master.thrift.DeadServer; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.monitor.Monitor; +import org.apache.accumulo.monitor.rest.logs.DeadLoggerInformation; +import org.apache.accumulo.monitor.rest.logs.DeadLoggerList; +import org.apache.accumulo.monitor.rest.tservers.BadTabletServerInformation; +import org.apache.accumulo.monitor.rest.tservers.BadTabletServers; +import org.apache.accumulo.monitor.rest.tservers.DeadServerInformation; +import org.apache.accumulo.monitor.rest.tservers.DeadServerList; +import org.apache.accumulo.monitor.rest.tservers.ServerShuttingDownInformation; +import org.apache.accumulo.monitor.rest.tservers.ServersShuttingDown; +import org.apache.accumulo.server.master.state.TabletServerState; + +/** + * + * Responsible for generating a new Master information JSON object + * + * @since 2.0.0 + * + */ +@Path("/master") +@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) +public class MasterResource { + public static final String NO_MASTERS = "No Masters running"; + + /** + * Gets the MasterMonitorInfo, allowing for mocking frameworks for testability + */ + protected static MasterMonitorInfo getMmi() { + return Monitor.getMmi(); + } + + /** + * Generates a master information JSON object + * + * @return master JSON object + */ + @GET + public static MasterInformation getTables() { + + MasterInformation masterInformation; + MasterMonitorInfo mmi = Monitor.getMmi(); + + if (mmi != null) { + GCStatus gcStatusObj = Monitor.getGcStatus(); + String gcStatus = "Waiting"; + String label = ""; + if (gcStatusObj != null) { + long start = 0; + if (gcStatusObj.current.started != 0 || gcStatusObj.currentLog.started != 0) { + start = Math.max(gcStatusObj.current.started, gcStatusObj.currentLog.started); + label = "Running"; + } else if (gcStatusObj.lastLog.finished != 0) { + start = gcStatusObj.lastLog.finished; + } + if (start != 0) { + gcStatus = String.valueOf(start); + } + } else { + gcStatus = "Down"; + } + + List<String> tservers = new ArrayList<>(); + for (TabletServerStatus up : mmi.tServerInfo) { + tservers.add(up.name); + } + for (DeadServer down : mmi.deadTabletServers) { + tservers.add(down.server); + } + List<String> masters = Monitor.getContext().getInstance().getMasterLocations(); + - String master = masters.size() == 0 ? "Down" : AddressUtil.parseAddress(masters.get(0), false).getHostText(); ++ String master = masters.size() == 0 ? "Down" : AddressUtil.parseAddress(masters.get(0), false).getHost(); + Integer onlineTabletServers = mmi.tServerInfo.size(); + Integer totalTabletServers = tservers.size(); + Integer tablets = Monitor.getTotalTabletCount(); + Integer unassignedTablets = mmi.unassignedTablets; + long entries = Monitor.getTotalEntries(); + double ingest = Monitor.getTotalIngestRate(); + double entriesRead = Monitor.getTotalScanRate(); + double entriesReturned = Monitor.getTotalQueryRate(); + long holdTime = Monitor.getTotalHoldTime(); + double osLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); + + int tables = Monitor.getTotalTables(); + int deadTabletServers = mmi.deadTabletServers.size(); + long lookups = Monitor.getTotalLookups(); + long uptime = System.currentTimeMillis() - Monitor.getStartTime(); + + masterInformation = new MasterInformation(master, onlineTabletServers, totalTabletServers, gcStatus, tablets, unassignedTablets, entries, ingest, + entriesRead, entriesReturned, holdTime, osLoad, tables, deadTabletServers, lookups, uptime, label, getGoalState(), getState(), getNumBadTservers(), + getServersShuttingDown(), getDeadTservers(), getDeadLoggers()); + } else { + masterInformation = new MasterInformation(); + } + return masterInformation; + } + + /** + * Returns the current state of the master + * + * @return master state + */ + public static String getState() { + MasterMonitorInfo mmi = getMmi(); + if (null == mmi) { + return NO_MASTERS; + } + return mmi.state.toString(); + } + + /** + * Returns the goal state of the master + * + * @return master goal state + */ + public static String getGoalState() { + MasterMonitorInfo mmi = getMmi(); + if (null == mmi) { + return NO_MASTERS; + } + return mmi.goalState.name(); + } + + /** + * Generates a dead server list as a JSON object + * + * @return dead server list + */ + public static DeadServerList getDeadTservers() { + MasterMonitorInfo mmi = getMmi(); + if (null == mmi) { + return new DeadServerList(); + } + + DeadServerList deadServers = new DeadServerList(); + // Add new dead servers to the list + for (DeadServer dead : mmi.deadTabletServers) { + deadServers.addDeadServer(new DeadServerInformation(dead.server, dead.lastStatus, dead.status)); + } + return deadServers; + } + + /** + * Generates a dead logger list as a JSON object + * + * @return dead logger list + */ + public static DeadLoggerList getDeadLoggers() { + MasterMonitorInfo mmi = getMmi(); + if (null == mmi) { + return new DeadLoggerList(); + } + + DeadLoggerList deadLoggers = new DeadLoggerList(); + // Add new dead loggers to the list + for (DeadServer dead : mmi.deadTabletServers) { + deadLoggers.addDeadLogger(new DeadLoggerInformation(dead.server, dead.lastStatus, dead.status)); + } + return deadLoggers; + } + + /** + * Generates bad tserver lists as a JSON object + * + * @return bad tserver list + */ + public static BadTabletServers getNumBadTservers() { + MasterMonitorInfo mmi = getMmi(); + if (null == mmi) { + return new BadTabletServers(); + } + + Map<String,Byte> badServers = mmi.getBadTServers(); + + if (null == badServers || badServers.isEmpty()) { + return new BadTabletServers(); + } + + BadTabletServers readableBadServers = new BadTabletServers(); + // Add new bad tservers to the list + for (Entry<String,Byte> badServer : badServers.entrySet()) { + try { + TabletServerState state = TabletServerState.getStateById(badServer.getValue()); + readableBadServers.addBadServer(new BadTabletServerInformation(badServer.getKey(), state.name())); + } catch (IndexOutOfBoundsException e) { + readableBadServers.addBadServer(new BadTabletServerInformation(badServer.getKey(), "Unknown state")); + } + } + return readableBadServers; + } + + /** + * Generates a JSON object of a list of servers shutting down + * + * @return servers shutting down list + */ + public static ServersShuttingDown getServersShuttingDown() { + ServersShuttingDown servers = new ServersShuttingDown(); + // Add new servers to the list + for (String server : Monitor.getMmi().serversShuttingDown) { + servers.addServerShuttingDown(new ServerShuttingDownInformation(server)); + } + return servers; + } + +} diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/rest/scans/ScansResource.java index 1b4e139,0000000..9eea795 mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/scans/ScansResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/scans/ScansResource.java @@@ -1,64 -1,0 +1,63 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.rest.scans; + +import java.util.Map; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.apache.accumulo.core.master.thrift.TabletServerStatus; ++import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.monitor.Monitor; +import org.apache.accumulo.monitor.Monitor.ScanStats; + - import com.google.common.net.HostAndPort; - +/** + * + * Generate a new Scan list JSON object + * + * @since 2.0.0 + * + */ +@Path("/scans") +@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) +public class ScansResource { + + /** + * Generates a new JSON object with scan information + * + * @return Scan JSON object + */ + @GET + public Scans getTables() { + + Scans scans = new Scans(); + + Map<HostAndPort,ScanStats> entry = Monitor.getScans(); + + // Adds new scans to the array + for (TabletServerStatus tserverInfo : Monitor.getMmi().getTServerInfo()) { + ScanStats stats = entry.get(HostAndPort.fromString(tserverInfo.name)); + if (stats != null) { + scans.addScan(new ScanInformation(tserverInfo, stats.scanCount, stats.oldestScan)); + } + } + return scans; + } +} diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tservers/TabletServerResource.java index 8e5607a,0000000..d96e40d mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tservers/TabletServerResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tservers/TabletServerResource.java @@@ -1,329 -1,0 +1,328 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.rest.tservers; + +import java.lang.management.ManagementFactory; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response.Status; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.Table; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; +import org.apache.accumulo.core.master.thrift.RecoveryStatus; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.tabletserver.thrift.ActionStats; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.tabletserver.thrift.TabletStats; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.util.AddressUtil; ++import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.monitor.Monitor; +import org.apache.accumulo.monitor.rest.master.MasterResource; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.master.state.DeadServerList; +import org.apache.accumulo.server.util.ActionStatsUpdator; + - import com.google.common.net.HostAndPort; - +/** + * + * Generates tserver lists as JSON objects + * + * @since 2.0.0 + * + */ +@Path("/tservers") +@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) +public class TabletServerResource { + + // Variable names become JSON keys + private TabletStats total; + private TabletStats historical; + + /** + * Generates tserver summary + * + * @return tserver summary + */ + @GET + public TabletServers getTserverSummary() { + MasterMonitorInfo mmi = Monitor.getMmi(); + if (null == mmi) { + throw new WebApplicationException(Status.INTERNAL_SERVER_ERROR); + } + + TabletServers tserverInfo = new TabletServers(mmi.tServerInfo.size()); + for (TabletServerStatus status : mmi.tServerInfo) { + tserverInfo.addTablet(new TabletServer(status)); + } + + tserverInfo.addBadTabletServer(MasterResource.getTables()); + + return tserverInfo; + } + + /** + * REST call to clear dead servers from list + * + * @param server + * Dead server to clear + */ + @POST + @Consumes(MediaType.TEXT_PLAIN) + public void clearDeadServer(@QueryParam("server") String server) throws Exception { + DeadServerList obit = new DeadServerList(ZooUtil.getRoot(Monitor.getContext().getInstance()) + Constants.ZDEADTSERVERS); + obit.delete(server); + } + + /** + * Generates a recovery tserver list + * + * @return Recovery tserver list + */ + @Path("recovery") + @GET + public TabletServersRecovery getTserverRecovery() { + TabletServersRecovery recoveryList = new TabletServersRecovery(); + + MasterMonitorInfo mmi = Monitor.getMmi(); + if (null == mmi) { + throw new WebApplicationException(Status.INTERNAL_SERVER_ERROR); + } + + for (TabletServerStatus server : mmi.tServerInfo) { + if (server.logSorts != null) { + for (RecoveryStatus recovery : server.logSorts) { - String serv = AddressUtil.parseAddress(server.name, false).getHostText(); ++ String serv = AddressUtil.parseAddress(server.name, false).getHost(); + String log = recovery.name; + int time = recovery.runtime; + double copySort = recovery.progress; + + recoveryList.addRecovery(new TabletServerRecoveryInformation(serv, log, time, copySort)); + } + } + } + + return recoveryList; + } + + /** + * Generates details for the selected tserver + * + * @param tserverAddr + * TServer name + * @return TServer details + */ + @Path("{address}") + @GET + public TabletServerSummary getTserverDetails(@PathParam("address") String tserverAddr) throws Exception { + + String tserverAddress = tserverAddr; + + boolean tserverExists = false; + if (tserverAddress != null && tserverAddress.isEmpty() == false) { + for (TabletServerStatus ts : Monitor.getMmi().getTServerInfo()) { + if (tserverAddress.equals(ts.getName())) { + tserverExists = true; + break; + } + } + } + + if (tserverAddress == null || tserverAddress.isEmpty() || tserverExists == false) { + + return null; + } + + double totalElapsedForAll = 0; + double splitStdDev = 0; + double minorStdDev = 0; + double minorQueueStdDev = 0; + double majorStdDev = 0; + double majorQueueStdDev = 0; + double currentMinorAvg = 0; + double currentMajorAvg = 0; + double currentMinorStdDev = 0; + double currentMajorStdDev = 0; + total = new TabletStats(null, new ActionStats(), new ActionStats(), new ActionStats(), 0, 0, 0, 0); + HostAndPort address = HostAndPort.fromString(tserverAddress); + historical = new TabletStats(null, new ActionStats(), new ActionStats(), new ActionStats(), 0, 0, 0, 0); + List<TabletStats> tsStats = new ArrayList<>(); + + try { + ClientContext context = Monitor.getContext(); + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context); + try { + for (String tableId : Monitor.getMmi().tableMap.keySet()) { + tsStats.addAll(client.getTabletStats(Tracer.traceInfo(), context.rpcCreds(), tableId)); + } + historical = client.getHistoricalStats(Tracer.traceInfo(), context.rpcCreds()); + } finally { + ThriftUtil.returnClient(client); + } + } catch (Exception e) { + return null; + } + + List<CurrentOperations> currentOps = doCurrentOperations(tsStats); + + if (total.minors.num != 0) + currentMinorAvg = (long) (total.minors.elapsed / total.minors.num); + if (total.minors.elapsed != 0 && total.minors.num != 0) + currentMinorStdDev = stddev(total.minors.elapsed, total.minors.num, total.minors.sumDev); + if (total.majors.num != 0) + currentMajorAvg = total.majors.elapsed / total.majors.num; + if (total.majors.elapsed != 0 && total.majors.num != 0 && total.majors.elapsed > total.majors.num) + currentMajorStdDev = stddev(total.majors.elapsed, total.majors.num, total.majors.sumDev); + + ActionStatsUpdator.update(total.minors, historical.minors); + ActionStatsUpdator.update(total.majors, historical.majors); + totalElapsedForAll += total.majors.elapsed + historical.splits.elapsed + total.minors.elapsed; + + minorStdDev = stddev(total.minors.elapsed, total.minors.num, total.minors.sumDev); + minorQueueStdDev = stddev(total.minors.queueTime, total.minors.num, total.minors.queueSumDev); + majorStdDev = stddev(total.majors.elapsed, total.majors.num, total.majors.sumDev); + majorQueueStdDev = stddev(total.majors.queueTime, total.majors.num, total.majors.queueSumDev); + splitStdDev = stddev(historical.splits.num, historical.splits.elapsed, historical.splits.sumDev); + + TabletServerDetailInformation details = doDetails(address, tsStats.size()); + + List<AllTimeTabletResults> allTime = doAllTimeResults(majorQueueStdDev, minorQueueStdDev, totalElapsedForAll, splitStdDev, majorStdDev, minorStdDev); + + CurrentTabletResults currentRes = doCurrentTabletResults(currentMinorAvg, currentMinorStdDev, currentMajorAvg, currentMajorStdDev); + + TabletServerSummary tserverDetails = new TabletServerSummary(details, allTime, currentRes, currentOps); + + return tserverDetails; + } + + private static final int concurrentScans = Monitor.getContext().getConfiguration().getCount(Property.TSERV_READ_AHEAD_MAXCONCURRENT); + + /** + * Generates the server stats + * + * @return Server stat list + */ + @Path("serverStats") + @GET + public ServerStats getServerStats() { + + ServerStats stats = new ServerStats(); + + stats.addStats(new ServerStat(ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors(), true, 100, "OS Load", "osload")); + stats.addStats(new ServerStat(1000, true, 1, "Ingest Entries", "ingest")); + stats.addStats(new ServerStat(10000, true, 1, "Scan Entries", "query")); + stats.addStats(new ServerStat(10, true, 10, "Ingest MB", "ingestMB")); + stats.addStats(new ServerStat(5, true, 10, "Scan MB", "queryMB")); + stats.addStats(new ServerStat(concurrentScans * 2, false, 1, "Running Scans", "scans")); + stats.addStats(new ServerStat(50, true, 10, "Scan Sessions", "scansessions")); + stats.addStats(new ServerStat(60000, false, 1, "Hold Time", "holdtime")); + stats.addStats(new ServerStat(1, false, 100, "Overall Avg", true, "allavg")); + stats.addStats(new ServerStat(1, false, 100, "Overall Max", true, "allmax")); + + return stats; + } + + private TabletServerDetailInformation doDetails(HostAndPort address, int numTablets) { + + return new TabletServerDetailInformation(numTablets, total.numEntries, total.minors.status, total.majors.status, historical.splits.status); + } + + private List<AllTimeTabletResults> doAllTimeResults(double majorQueueStdDev, double minorQueueStdDev, double totalElapsedForAll, double splitStdDev, + double majorStdDev, double minorStdDev) { + + List<AllTimeTabletResults> allTime = new ArrayList<>(); + + // Minor Compaction Operation + allTime.add(new AllTimeTabletResults("Minor Compaction", total.minors.num, total.minors.fail, + total.minors.num != 0 ? (total.minors.queueTime / total.minors.num) : null, minorQueueStdDev, + total.minors.num != 0 ? (total.minors.elapsed / total.minors.num) : null, minorStdDev, total.minors.elapsed)); + + // Major Compaction Operation + allTime.add(new AllTimeTabletResults("Major Compaction", total.majors.num, total.majors.fail, + total.majors.num != 0 ? (total.majors.queueTime / total.majors.num) : null, majorQueueStdDev, + total.majors.num != 0 ? (total.majors.elapsed / total.majors.num) : null, majorStdDev, total.majors.elapsed)); + // Split Operation + allTime.add(new AllTimeTabletResults("Split", historical.splits.num, historical.splits.fail, null, null, + historical.splits.num != 0 ? (historical.splits.elapsed / historical.splits.num) : null, splitStdDev, historical.splits.elapsed)); + + return allTime; + } + + private CurrentTabletResults doCurrentTabletResults(double currentMinorAvg, double currentMinorStdDev, double currentMajorAvg, double currentMajorStdDev) { + + return new CurrentTabletResults(currentMinorAvg, currentMinorStdDev, currentMajorAvg, currentMajorStdDev); + } + + private List<CurrentOperations> doCurrentOperations(List<TabletStats> tsStats) throws Exception { + + List<CurrentOperations> currentOperations = new ArrayList<>(); + + for (TabletStats info : tsStats) { + if (info.extent == null) { + historical = info; + continue; + } + total.numEntries += info.numEntries; + ActionStatsUpdator.update(total.minors, info.minors); + ActionStatsUpdator.update(total.majors, info.majors); + + KeyExtent extent = new KeyExtent(info.extent); + Table.ID tableId = extent.getTableId(); + MessageDigest digester = MessageDigest.getInstance("MD5"); + if (extent.getEndRow() != null && extent.getEndRow().getLength() > 0) { + digester.update(extent.getEndRow().getBytes(), 0, extent.getEndRow().getLength()); + } + String obscuredExtent = Base64.getEncoder().encodeToString(digester.digest()); + String displayExtent = String.format("[%s]", obscuredExtent); + + String tableName = Tables.getPrintableTableInfoFromId(HdfsZooInstance.getInstance(), tableId); + + currentOperations.add(new CurrentOperations(tableName, tableId, displayExtent, info.numEntries, info.ingestRate, info.queryRate, + info.minors.num != 0 ? info.minors.elapsed / info.minors.num : null, stddev(info.minors.elapsed, info.minors.num, info.minors.sumDev), + info.minors.elapsed != 0 ? info.minors.count / info.minors.elapsed : null, info.majors.num != 0 ? info.majors.elapsed / info.majors.num : null, + stddev(info.majors.elapsed, info.majors.num, info.majors.sumDev), info.majors.elapsed != 0 ? info.majors.count / info.majors.elapsed : null)); + } + + return currentOperations; + } + + private static double stddev(double elapsed, double num, double sumDev) { + if (num != 0) { + double average = elapsed / num; + return Math.sqrt((sumDev / num) - (average * average)); + } + return 0; + } +} diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java index a965f0c,0000000..5a24e4f mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java @@@ -1,222 -1,0 +1,221 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.util; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.conf.Property; ++import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.AsyncAppender; +import org.apache.log4j.net.SocketAppender; +import org.apache.zookeeper.data.Stat; + - import com.google.common.net.HostAndPort; - +public class AccumuloMonitorAppender extends AsyncAppender implements AutoCloseable { + + final ScheduledExecutorService executorService; + final AtomicBoolean trackerScheduled; + private int frequency = 0; + private MonitorTracker tracker = null; + + /** + * A Log4j Appender which follows the registered location of the active Accumulo monitor service, and forwards log messages to it + */ + public AccumuloMonitorAppender() { + // create the background thread to watch for updates to monitor location + trackerScheduled = new AtomicBoolean(false); + executorService = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread t = new Thread(runnable, AccumuloMonitorAppender.class.getSimpleName() + " Location Tracker"); + t.setDaemon(true); + return t; + }); + } + + public void setFrequency(int millis) { + if (millis > 0) { + frequency = millis; + } + } + + public int getFrequency() { + return frequency; + } + + // this is just for testing + void setTracker(MonitorTracker monitorTracker) { + tracker = monitorTracker; + } + + @Override + public void activateOptions() { + // only schedule it once (in case options get activated more than once); not sure if this is possible + if (trackerScheduled.compareAndSet(false, true)) { + if (frequency <= 0) { + // use default rate of 5 seconds between each check + frequency = 5000; + } + if (tracker == null) { + tracker = new MonitorTracker(this, new ZooCacheLocationSupplier(), new SocketAppenderFactory()); + } + executorService.scheduleWithFixedDelay(tracker, frequency, frequency, TimeUnit.MILLISECONDS); + } + super.activateOptions(); + } + + @Override + public void close() { + if (!executorService.isShutdown()) { + executorService.shutdownNow(); + } + super.close(); + } + + static class MonitorLocation { + private final String location; + private final long modId; + + public MonitorLocation(long modId, byte[] location) { + this.modId = modId; + this.location = location == null ? null : new String(location, UTF_8); + } + + public boolean hasLocation() { + return location != null; + } + + public String getLocation() { + return location; + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof MonitorLocation) { + MonitorLocation other = (MonitorLocation) obj; + return modId == other.modId && Objects.equals(location, other.location); + } + return false; + } + + @Override + public int hashCode() { + return Long.hashCode(modId); + } + } + + private static class ZooCacheLocationSupplier implements Supplier<MonitorLocation> { + + // path and zooCache are lazily set the first time this tracker is run + // this allows the tracker to be constructed and scheduled during log4j initialization without + // triggering any actual logs from the Accumulo or ZooKeeper code + private String path = null; + private ZooCache zooCache = null; + + @Override + public MonitorLocation get() { + // lazily set up path and zooCache (see comment in constructor) + if (this.zooCache == null) { + Instance instance = HdfsZooInstance.getInstance(); + this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR; + this.zooCache = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + } + + // get the current location from the cache and update if necessary + Stat stat = new Stat(); + byte[] loc = zooCache.get(path, stat); + // mzxid is 0 if location does not exist and the non-zero transaction id of the last modification otherwise + return new MonitorLocation(stat.getMzxid(), loc); + } + } + + private static class SocketAppenderFactory implements Function<MonitorLocation,AppenderSkeleton> { + @Override + public AppenderSkeleton apply(MonitorLocation loc) { + int defaultPort = Integer.parseUnsignedInt(Property.MONITOR_LOG4J_PORT.getDefaultValue()); + HostAndPort remote = HostAndPort.fromString(loc.getLocation()); + + SocketAppender socketAppender = new SocketAppender(); + socketAppender.setApplication(System.getProperty("accumulo.application", "unknown")); - socketAppender.setRemoteHost(remote.getHostText()); ++ socketAppender.setRemoteHost(remote.getHost()); + socketAppender.setPort(remote.getPortOrDefault(defaultPort)); + + return socketAppender; + } + } + + static class MonitorTracker implements Runnable { + + private final AccumuloMonitorAppender parentAsyncAppender; + private final Supplier<MonitorLocation> currentLocationSupplier; + private final Function<MonitorLocation,AppenderSkeleton> appenderFactory; + + private MonitorLocation lastLocation; + private AppenderSkeleton lastSocketAppender; + + public MonitorTracker(AccumuloMonitorAppender appender, Supplier<MonitorLocation> currentLocationSupplier, + Function<MonitorLocation,AppenderSkeleton> appenderFactory) { + this.parentAsyncAppender = Objects.requireNonNull(appender); + this.appenderFactory = Objects.requireNonNull(appenderFactory); + this.currentLocationSupplier = Objects.requireNonNull(currentLocationSupplier); + + this.lastLocation = new MonitorLocation(0, null); + this.lastSocketAppender = null; + } + + @Override + public void run() { + try { + MonitorLocation currentLocation = currentLocationSupplier.get(); + // detect change + if (!currentLocation.equals(lastLocation)) { + // clean up old appender + if (lastSocketAppender != null) { + parentAsyncAppender.removeAppender(lastSocketAppender); + lastSocketAppender.close(); + lastSocketAppender = null; + } + // create a new one + if (currentLocation.hasLocation()) { + lastSocketAppender = appenderFactory.apply(currentLocation); + lastSocketAppender.activateOptions(); + parentAsyncAppender.addAppender(lastSocketAppender); + } + // update the last location only if switching was successful + lastLocation = currentLocation; + } + } catch (Exception e) { + // dump any non-fatal problems to the console, but let it run again + e.printStackTrace(); + } + } + + } + +} diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java index 4f0c92d,0000000..140a7f5 mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java @@@ -1,384 -1,0 +1,384 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.view; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.Table; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.monitor.Monitor; +import org.glassfish.jersey.server.mvc.Template; + +/** + * + * Index is responsible of specifying Monitor paths and setting the templates for the HTML code + * + * @since 2.0.0 + * + */ +@Path("/") +@Produces(MediaType.TEXT_HTML) +public class WebViews { + + private Map<String,Object> getModel() { + + Map<String,Object> model = new HashMap<>(); + model.put("version", Constants.VERSION); + model.put("instance_name", Monitor.cachedInstanceName.get()); + model.put("instance_id", Monitor.getContext().getInstance().getInstanceID()); + return model; + } + + /** + * Returns the overview template + * + * @return Overview model + */ + @GET + @Template(name = "/default.ftl") + public Map<String,Object> get() { + + Map<String,Object> model = getModel(); + model.put("title", "Accumulo Overview"); + model.put("template", "overview.ftl"); + model.put("js", "overview.js"); + + return model; + } + + /** + * Returns the master template + * + * @return Master model + */ + @GET + @Path("{parameter: master|monitor}") + @Template(name = "/default.ftl") + public Map<String,Object> getMaster() { + + List<String> masters = Monitor.getContext().getInstance().getMasterLocations(); + + Map<String,Object> model = getModel(); - model.put("title", "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0), false).getHostText())); ++ model.put("title", "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0), false).getHost())); + model.put("template", "master.ftl"); + model.put("js", "master.js"); + + model.put("tablesTitle", "Table Status"); + model.put("tablesTemplate", "tables.ftl"); + model.put("tablesJs", "tables.js"); + return model; + } + + /** + * Returns the tservers templates + * + * @param server + * TServer to show details + * @return tserver model + */ + @GET + @Path("tservers") + @Template(name = "/default.ftl") + public Map<String,Object> getTabletServers(@QueryParam("s") String server) { + + Map<String,Object> model = getModel(); + model.put("title", "Tablet Server Status"); + if (server != null) { + model.put("template", "server.ftl"); + model.put("js", "server.js"); + model.put("server", server); + return model; + } + model.put("template", "tservers.ftl"); + model.put("js", "tservers.js"); + return model; + } + + /** + * Returns the scans template + * + * @return Scans model + */ + @GET + @Path("scans") + @Template(name = "/default.ftl") + public Map<String,Object> getScans() { + + Map<String,Object> model = getModel(); + model.put("title", "Scans"); + model.put("template", "scans.ftl"); + model.put("js", "scans.js"); + + return model; + } + + /** + * Returns the bulk import template + * + * @return Bulk Import model + */ + @GET + @Path("bulkImports") + @Template(name = "/default.ftl") + public Map<String,Object> getBulkImports() { + + Map<String,Object> model = getModel(); + model.put("title", "Bulk Imports"); + model.put("template", "bulkImport.ftl"); + model.put("js", "bulkImport.js"); + + return model; + } + + /** + * Returns the garbage collector template + * + * @return GC model + */ + @GET + @Path("gc") + @Template(name = "/default.ftl") + public Map<String,Object> getGC() { + + Map<String,Object> model = getModel(); + model.put("title", "Garbage Collector Status"); + model.put("template", "gc.ftl"); + model.put("js", "gc.js"); + + return model; + } + + /** + * Returns the server activity template + * + * @param shape + * Shape of visualization + * @param size + * Size of visualization + * @param motion + * Motion of visualization + * @param color + * Color of visualization + * @return Server activity model + */ + @GET + @Path("vis") + @Template(name = "/default.ftl") + public Map<String,Object> getServerActivity(@QueryParam("shape") @DefaultValue("circles") String shape, @QueryParam("size") @DefaultValue("40") String size, + @QueryParam("motion") @DefaultValue("") String motion, @QueryParam("color") @DefaultValue("allavg") String color) { + + Map<String,Object> model = getModel(); + model.put("title", "Server Activity"); + model.put("template", "vis.ftl"); + + model.put("shape", shape); + model.put("size", size); + model.put("motion", motion); + model.put("color", color); + + return model; + } + + /** + * Returns the tables template + * + * @return Tables model + */ + @GET + @Path("tables") + @Template(name = "/default.ftl") + public Map<String,Object> getTables() throws TableNotFoundException { + + Map<String,Object> model = getModel(); + model.put("title", "Table Status"); // Need this for the browser tab title + model.put("tablesTitle", "Table Status"); + model.put("template", "tables.ftl"); + model.put("js", "tables.js"); + + return model; + } + + /** + * Returns participating tservers template + * + * @param tableID + * Table ID for participating tservers + * @return Participating tservers model + */ + @GET + @Path("tables/{tableID}") + @Template(name = "/default.ftl") + public Map<String,Object> getTables(@PathParam("tableID") String tableID) throws TableNotFoundException { + + String tableName = Tables.getTableName(Monitor.getContext().getInstance(), Table.ID.of(tableID)); + + Map<String,Object> model = getModel(); + model.put("title", "Table Status"); + + model.put("template", "table.ftl"); + model.put("js", "table.js"); + model.put("tableID", tableID); + model.put("table", tableName); + + return model; + } + + /** + * Returns trace summary template + * + * @param minutes + * Range of minutes + * @return Trace summary model + */ + @GET + @Path("trace/summary") + @Template(name = "/default.ftl") + public Map<String,Object> getTracesSummary(@QueryParam("minutes") @DefaultValue("10") String minutes) { + + Map<String,Object> model = getModel(); + model.put("title", "Traces for the last " + minutes + " minute(s)"); + + model.put("template", "summary.ftl"); + model.put("js", "summary.js"); + model.put("minutes", minutes); + + return model; + } + + /** + * Returns traces by type template + * + * @param type + * Type of trace + * @param minutes + * Range of minutes + * @return Traces by type model + */ + @GET + @Path("trace/listType") + @Template(name = "/default.ftl") + public Map<String,Object> getTracesForType(@QueryParam("type") String type, @QueryParam("minutes") @DefaultValue("10") String minutes) { + + Map<String,Object> model = getModel(); + model.put("title", "Traces for " + type + " for the last " + minutes + " minute(s)"); + + model.put("template", "listType.ftl"); + model.put("js", "listType.js"); + model.put("type", type); + model.put("minutes", minutes); + + return model; + } + + /** + * Returns traces by ID template + * + * @param id + * ID of the traces + * @return Traces by ID model + */ + @GET + @Path("trace/show") + @Template(name = "/default.ftl") + public Map<String,Object> getTraceShow(@QueryParam("id") String id) { + + Map<String,Object> model = getModel(); + model.put("title", "Trace ID " + id); + + model.put("template", "show.ftl"); + model.put("js", "show.js"); + model.put("id", id); + + return model; + } + + /** + * Returns log report template + * + * @return Log report model + */ + @GET + @Path("log") + @Template(name = "/default.ftl") + public Map<String,Object> getLogs() { + + Map<String,Object> model = getModel(); + model.put("title", "Recent Logs"); + + model.put("template", "log.ftl"); + model.put("js", "log.js"); + + return model; + } + + /** + * Returns problem report template + * + * @param table + * Table ID to display problem details + * @return Problem report model + */ + @GET + @Path("problems") + @Template(name = "/default.ftl") + public Map<String,Object> getProblems(@QueryParam("table") String table) { + + Map<String,Object> model = getModel(); + model.put("title", "Per-Table Problem Report"); + + model.put("template", "problems.ftl"); + model.put("js", "problems.js"); + + if (table != null) { + model.put("table", table); + } + + return model; + } + + /** + * Returns replication table template + * + * @return Replication model + */ + @GET + @Path("replication") + @Template(name = "/default.ftl") + public Map<String,Object> getReplication() { + + Map<String,Object> model = getModel(); + model.put("title", "Replication Overview"); + + model.put("template", "replication.ftl"); + model.put("js", "replication.js"); + + return model; + } +} diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 4d847d1,6eaea59..ebb0bfe --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -2439,23 -2317,23 +2438,23 @@@ public class TabletServer extends Accum } else { processor = new Processor<>(rpcProxy); } - HostAndPort address = startServer(getServerConfigurationFactory().getSystemConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, - processor, "Thrift Client Server"); - HostAndPort address = startServer(getServerConfigurationFactory().getConfiguration(), clientAddress.getHost(), Property.TSERV_CLIENTPORT, processor, ++ HostAndPort address = startServer(getServerConfigurationFactory().getSystemConfiguration(), clientAddress.getHost(), Property.TSERV_CLIENTPORT, processor, + "Thrift Client Server"); - log.info("address = " + address); + log.info("address = {}", address); return address; } private HostAndPort startReplicationService() throws UnknownHostException { final ReplicationServicerHandler handler = new ReplicationServicerHandler(this); - ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler, new ReplicationServicer.Processor<ReplicationServicer.Iface>(handler)); + ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler); ReplicationServicer.Iface repl = TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass(), getConfiguration()); ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<>(repl); - AccumuloConfiguration conf = getServerConfigurationFactory().getConfiguration(); + AccumuloConfiguration conf = getServerConfigurationFactory().getSystemConfiguration(); Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); - ServerAddress sp = TServerUtils.startServer(this, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, + ServerAddress sp = TServerUtils.startServer(this, clientAddress.getHost(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); this.replServer = sp.server; - log.info("Started replication service on " + sp.address); + log.info("Started replication service on {}", sp.address); try { // The replication service is unique to the thrift service for a tserver, not just a host. diff --cc test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index 7598af7,cb2af2a..93ff392 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@@ -85,8 -76,11 +85,8 @@@ import org.apache.accumulo.server.zooke import org.apache.thrift.TException; import com.beust.jcommander.Parameter; - import com.google.common.net.HostAndPort; + import org.apache.accumulo.core.util.HostAndPort; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; - /** * The purpose of this class is to server as fake tserver that is a data sink like /dev/null. NullTserver modifies the metadata location entries for a table to * point to it. This allows thrift performance to be measured by running any client code that writes to a table. diff --cc test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java index ec962d1,513132d..0f919a6 --- a/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java +++ b/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java @@@ -64,6 -64,9 +64,7 @@@ import org.apache.accumulo.core.iterato import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ByteBufferUtil; + import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.examples.simple.constraints.MaxMutationSize; -import org.apache.accumulo.examples.simple.constraints.NumericValueConstraint; import org.apache.accumulo.harness.MiniClusterHarness; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.harness.TestingKdc; diff --cc test/src/main/java/org/apache/accumulo/test/proxy/TestProxyNamespaceOperations.java index 5e76978,ce142e0..21cb599 --- a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyNamespaceOperations.java +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyNamespaceOperations.java @@@ -28,8 -28,9 +28,9 @@@ import java.util.HashSet import java.util.Properties; import java.util.Set; -import org.apache.accumulo.core.client.impl.Namespaces; +import org.apache.accumulo.core.client.impl.Namespace; import org.apache.accumulo.core.client.security.tokens.PasswordToken; + import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.proxy.Proxy; import org.apache.accumulo.proxy.thrift.AccumuloException; import org.apache.accumulo.proxy.thrift.IteratorScope; -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
