Repository: cassandra Updated Branches: refs/heads/trunk 961b5c9f4 -> 21448c508
Add a command to see if a Materialized View has finished building Patch by Carl Yeksigian; reviewed by Joel Knighton for CASSANDRA-9967 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/21448c50 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/21448c50 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/21448c50 Branch: refs/heads/trunk Commit: 21448c50891642f95097a9e5ed0a3802bd90a877 Parents: 961b5c9 Author: Carl Yeksigian <[email protected]> Authored: Thu Mar 31 16:28:20 2016 -0400 Committer: Carl Yeksigian <[email protected]> Committed: Thu Mar 31 16:28:20 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cql3/QueryProcessor.java | 8 +- .../org/apache/cassandra/db/SystemKeyspace.java | 31 +++++-- .../apache/cassandra/db/view/ViewBuilder.java | 32 +++++++- .../apache/cassandra/db/view/ViewManager.java | 2 + .../repair/SystemDistributedKeyspace.java | 86 +++++++++++++++++++- .../cassandra/service/CassandraDaemon.java | 26 +++--- .../cassandra/service/StorageService.java | 19 +++++ .../cassandra/service/StorageServiceMBean.java | 2 + .../org/apache/cassandra/tools/NodeProbe.java | 5 ++ .../org/apache/cassandra/tools/NodeTool.java | 3 +- .../tools/nodetool/ViewBuildStatus.java | 84 +++++++++++++++++++ 12 files changed, 267 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f8a4141..0401357 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.6 + * Add a command to see if a Materialized View has finished building (CASSANDRA-9967) * Log endpoint and port associated with streaming operation (CASSANDRA-8777) * Print sensible units for all log messages (CASSANDRA-9692) * Upgrade Netty to version 4.0.34 (CASSANDRA-11096) http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 9801b41..1fd8564 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -299,13 +299,19 @@ public class QueryProcessor implements QueryHandler return null; } + public static UntypedResultSet execute(String query, ConsistencyLevel cl, Object... values) + throws RequestExecutionException + { + return execute(query, cl, internalQueryState(), values); + } + public static UntypedResultSet execute(String query, ConsistencyLevel cl, QueryState state, Object... values) throws RequestExecutionException { try { ParsedStatement.Prepared prepared = prepareInternal(query); - ResultMessage result = prepared.statement.execute(state, makeInternalOptions(prepared, values)); + ResultMessage result = prepared.statement.execute(state, makeInternalOptions(prepared, values, cl)); if (result instanceof ResultMessage.Rows) return UntypedResultSet.create(((ResultMessage.Rows)result).result); else http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index a51ade0..1427de0 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -262,6 +262,7 @@ public final class SystemKeyspace "CREATE TABLE %s (" + "keyspace_name text," + "view_name text," + + "status_replicated boolean," + "PRIMARY KEY ((keyspace_name), view_name))"); @Deprecated @@ -536,13 +537,23 @@ public final class SystemKeyspace return !result.isEmpty(); } - public static void setViewBuilt(String keyspaceName, String viewName) + public static boolean isViewStatusReplicated(String keyspaceName, String viewName) { - String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name) VALUES (?, ?)"; - executeInternal(String.format(req, NAME, BUILT_VIEWS), keyspaceName, viewName); - forceBlockingFlush(BUILT_VIEWS); + String req = "SELECT status_replicated FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?"; + UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_VIEWS), keyspaceName, viewName); + + if (result.isEmpty()) + return false; + UntypedResultSet.Row row = result.one(); + return row.has("status_replicated") && row.getBoolean("status_replicated"); } + public static void setViewBuilt(String keyspaceName, String viewName, boolean replicated) + { + String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name, status_replicated) VALUES (?, ?, ?)"; + executeInternal(String.format(req, NAME, BUILT_VIEWS), keyspaceName, viewName, replicated); + forceBlockingFlush(BUILT_VIEWS); + } public static void setViewRemoved(String keyspaceName, String viewName) { @@ -568,14 +579,18 @@ public final class SystemKeyspace // We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed // view build. // If we flush the delete first, we'll have to restart from the beginning. - // Also, if the build succeeded, but the view build failed, we will be able to skip the view build check - // next boot. - setViewBuilt(ksname, viewName); - forceBlockingFlush(BUILT_VIEWS); + // Also, if writing to the built_view succeeds, but the view_builds_in_progress deletion fails, we will be able + // to skip the view build next boot. + setViewBuilt(ksname, viewName, false); executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", VIEWS_BUILDS_IN_PROGRESS), ksname, viewName); forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS); } + public static void setViewBuiltReplicated(String ksname, String viewName) + { + setViewBuilt(ksname, viewName, true); + } + public static void updateViewBuildStatus(String ksname, String viewName, Token token) { String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index 8146211..23eeba4 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -42,6 +42,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.repair.SystemDistributedKeyspace; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.pager.QueryPager; @@ -102,10 +103,15 @@ public class ViewBuilder extends CompactionInfo.Holder public void run() { + UUID localHostId = SystemKeyspace.getLocalHostId(); String ksname = baseCfs.metadata.ksName, viewName = view.name; if (SystemKeyspace.isViewBuilt(ksname, viewName)) + { + if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName)) + updateDistributed(ksname, viewName, localHostId); return; + } Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName); final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName); @@ -148,6 +154,7 @@ public class ViewBuilder extends CompactionInfo.Holder try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs; ReducingKeyIterator iter = new ReducingKeyIterator(sstables)) { + SystemDistributedKeyspace.startViewBuild(ksname, viewName, localHostId); while (!isStopped && iter.hasNext()) { DecoratedKey key = iter.next(); @@ -172,19 +179,36 @@ public class ViewBuilder extends CompactionInfo.Holder } if (!isStopped) - SystemKeyspace.finishViewBuildStatus(ksname, viewName); - + { + SystemKeyspace.finishViewBuildStatus(ksname, viewName); + updateDistributed(ksname, viewName, localHostId); + } } catch (Exception e) { - final ViewBuilder builder = new ViewBuilder(baseCfs, view); - ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(builder), + ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(this), 5, TimeUnit.MINUTES); logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e); } } + private void updateDistributed(String ksname, String viewName, UUID localHostId) + { + try + { + SystemDistributedKeyspace.successfulViewBuild(ksname, viewName, localHostId); + SystemKeyspace.setViewBuiltReplicated(ksname, viewName); + } + catch (Exception e) + { + ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(this), + 5, + TimeUnit.MINUTES); + logger.warn("Failed to updated the distributed status of view, sleeping 5 minutes before retrying", e); + } + } + public CompactionInfo getCompactionInfo() { long rangesLeft = 0, rangesTotal = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/db/view/ViewManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java index ac48cfe..faa5551 100644 --- a/src/java/org/apache/cassandra/db/view/ViewManager.java +++ b/src/java/org/apache/cassandra/db/view/ViewManager.java @@ -33,6 +33,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.SystemDistributedKeyspace; import org.apache.cassandra.service.StorageProxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -235,6 +236,7 @@ public class ViewManager forTable(view.getDefinition().baseTableId).removeView(name); SystemKeyspace.setViewRemoved(keyspace.getName(), view.name); + SystemDistributedKeyspace.setViewRemoved(keyspace.getName(), view.name); } public void buildAllViews() http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java index 2a479b4..fbbc125 100644 --- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java @@ -23,12 +23,15 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.slf4j.Logger; @@ -36,16 +39,19 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Tables; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + public final class SystemDistributedKeyspace { private SystemDistributedKeyspace() @@ -60,6 +66,8 @@ public final class SystemDistributedKeyspace public static final String PARENT_REPAIR_HISTORY = "parent_repair_history"; + public static final String VIEW_BUILD_STATUS = "view_build_status"; + private static final CFMetaData RepairHistory = compile(REPAIR_HISTORY, "Repair history", @@ -95,6 +103,16 @@ public final class SystemDistributedKeyspace + "options map<text, text>," + "PRIMARY KEY (parent_id))"); + private static final CFMetaData ViewBuildStatus = + compile(VIEW_BUILD_STATUS, + "Materialized View build status", + "CREATE TABLE %s (" + + "keyspace_name text," + + "view_name text," + + "host_id uuid," + + "status text," + + "PRIMARY KEY ((keyspace_name, view_name), host_id))"); + private static CFMetaData compile(String name, String description, String schema) { return CFMetaData.compile(String.format(schema, name), NAME) @@ -103,7 +121,7 @@ public final class SystemDistributedKeyspace public static KeyspaceMetadata metadata() { - return KeyspaceMetadata.create(NAME, KeyspaceParams.simple(3), Tables.of(RepairHistory, ParentRepairHistory)); + return KeyspaceMetadata.create(NAME, KeyspaceParams.simple(3), Tables.of(RepairHistory, ParentRepairHistory, ViewBuildStatus)); } public static void startParentRepair(UUID parent_id, String keyspaceName, String[] cfnames, RepairOption options) @@ -220,6 +238,58 @@ public final class SystemDistributedKeyspace processSilent(fmtQry, t.getMessage(), sw.toString()); } + public static void startViewBuild(String keyspace, String view, UUID hostId) + { + String query = "INSERT INTO %s.%s (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)"; + QueryProcessor.process(String.format(query, NAME, VIEW_BUILD_STATUS), + ConsistencyLevel.ONE, + Lists.newArrayList(bytes(keyspace), + bytes(view), + bytes(hostId), + bytes(BuildStatus.STARTED.toString()))); + } + + public static void successfulViewBuild(String keyspace, String view, UUID hostId) + { + String query = "UPDATE %s.%s SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?"; + QueryProcessor.process(String.format(query, NAME, VIEW_BUILD_STATUS), + ConsistencyLevel.ONE, + Lists.newArrayList(bytes(BuildStatus.SUCCESS.toString()), + bytes(keyspace), + bytes(view), + bytes(hostId))); + } + + public static Map<UUID, String> viewStatus(String keyspace, String view) + { + String query = "SELECT host_id, status FROM %s.%s WHERE keyspace_name = ? AND view_name = ?"; + UntypedResultSet results; + try + { + results = QueryProcessor.execute(String.format(query, NAME, VIEW_BUILD_STATUS), + ConsistencyLevel.ONE, + keyspace, + view); + } catch (Exception e) { + return Collections.emptyMap(); + } + + + Map<UUID, String> status = new HashMap<>(); + for (UntypedResultSet.Row row : results) + { + status.put(row.getUUID("host_id"), row.getString("status")); + } + return status; + } + + public static void setViewRemoved(String keyspaceName, String viewName) + { + String buildReq = "DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?"; + QueryProcessor.executeInternal(String.format(buildReq, NAME, VIEW_BUILD_STATUS), keyspaceName, viewName); + forceBlockingFlush(VIEW_BUILD_STATUS); + } + private static void processSilent(String fmtQry, String... values) { try @@ -227,7 +297,7 @@ public final class SystemDistributedKeyspace List<ByteBuffer> valueList = new ArrayList<>(); for (String v : values) { - valueList.add(ByteBufferUtil.bytes(v)); + valueList.add(bytes(v)); } QueryProcessor.process(fmtQry, ConsistencyLevel.ONE, valueList); } @@ -237,9 +307,19 @@ public final class SystemDistributedKeyspace } } + public static void forceBlockingFlush(String table) + { + if (!Boolean.getBoolean("cassandra.unsafesystem")) + FBUtilities.waitOnFuture(Keyspace.open(NAME).getColumnFamilyStore(table).forceFlush()); + } private enum RepairState { STARTED, SUCCESS, FAILED } + + private enum BuildStatus + { + UNKNOWN, STARTED, SUCCESS + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index f0f790a..160235a 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -318,21 +318,6 @@ public class CassandraDaemon } } - Runnable viewRebuild = new Runnable() - { - @Override - public void run() - { - for (Keyspace keyspace : Keyspace.all()) - { - keyspace.viewManager.buildAllViews(); - } - } - }; - - ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS); - - SystemKeyspace.finishStartup(); // Metrics @@ -363,6 +348,17 @@ public class CassandraDaemon exitOrFail(1, "Fatal configuration error", e); } + // Because we are writing to the system_distributed keyspace, this should happen after that is created, which + // happens in StorageService.instance.initServer() + Runnable viewRebuild = () -> { + for (Keyspace keyspace : Keyspace.all()) + { + keyspace.viewManager.buildAllViews(); + } + }; + + ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS); + Mx4jTool.maybeLoad(); if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress())) http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 52dcb97..fe3e982 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -4398,6 +4398,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return Collections.unmodifiableList(keyspaceNamesList); } + public Map<String, String> getViewBuildStatuses(String keyspace, String view) + { + Map<UUID, String> coreViewStatus = SystemDistributedKeyspace.viewStatus(keyspace, view); + Map<InetAddress, UUID> hostIdToEndpoint = tokenMetadata.getEndpointToHostIdMapForReading(); + Map<String, String> result = new HashMap<>(); + + for (Map.Entry<InetAddress, UUID> entry : hostIdToEndpoint.entrySet()) + { + UUID hostId = entry.getValue(); + InetAddress endpoint = entry.getKey(); + result.put(endpoint.toString(), + coreViewStatus.containsKey(hostId) + ? coreViewStatus.get(hostId) + : "UNKNOWN"); + } + + return Collections.unmodifiableMap(result); + } + public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException { IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 2238f33..6598075 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -452,6 +452,8 @@ public interface StorageServiceMBean extends NotificationEmitter public List<String> getNonSystemKeyspaces(); + public Map<String, String> getViewBuildStatuses(String keyspace, String view); + /** * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime * @param epSnitchClassName the canonical path name for a class implementing IEndpointSnitch http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 19a90a0..4dc2770 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -871,6 +871,11 @@ public class NodeProbe implements AutoCloseable return spProxy.getHintedHandoffDisabledDCs(); } + public Map<String, String> getViewBuildStatuses(String keyspace, String view) + { + return ssProxy.getViewBuildStatuses(keyspace, view); + } + public void pauseHintsDelivery() { hhProxy.pauseHintsDelivery(true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 300c7a0..aca2869 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -139,7 +139,8 @@ public class NodeTool EnableHintsForDC.class, FailureDetectorInfo.class, RefreshSizeEstimates.class, - RelocateSSTables.class + RelocateSSTables.class, + ViewBuildStatus.class ); Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/tools/nodetool/ViewBuildStatus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/ViewBuildStatus.java b/src/java/org/apache/cassandra/tools/nodetool/ViewBuildStatus.java new file mode 100644 index 0000000..0696396 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/ViewBuildStatus.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import io.airlift.command.Arguments; +import io.airlift.command.Command; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; + +import static com.google.common.base.Preconditions.checkArgument; + +@Command(name = "viewbuildstatus", description = "Show progress of a materialized view build") +public class ViewBuildStatus extends NodeTool.NodeToolCmd +{ + private final static String SUCCESS = "SUCCESS"; + + @Arguments(usage = "<keyspace> <view> | <keyspace.view>", description = "The keyspace and view name") + private List<String> args = new ArrayList<>(); + + protected void execute(NodeProbe probe) + { + String keyspace = null, view = null; + if (args.size() == 2) + { + keyspace = args.get(0); + view = args.get(1); + } + else if (args.size() == 1) + { + String[] input = args.get(0).split("\\."); + checkArgument(input.length == 2, "viewbuildstatus requires keyspace and view name arguments"); + keyspace = input[0]; + view = input[1]; + } + else + { + checkArgument(false, "viewbuildstatus requires keyspace and view name arguments"); + } + + Map<String, String> buildStatus = probe.getViewBuildStatuses(keyspace, view); + boolean failed = false; + TableBuilder builder = new TableBuilder(); + + builder.add("Host", "Info"); + for (Map.Entry<String, String> status : buildStatus.entrySet()) + { + if (!status.getValue().equals(SUCCESS)) { + failed = true; + } + builder.add(status.getKey(), status.getValue()); + } + + if (failed) { + System.out.println(String.format("%s.%s has not finished building; node status is below.", keyspace, view)); + System.out.println(); + builder.printTo(System.out); + System.exit(1); + } else { + System.out.println(String.format("%s.%s has finished building", keyspace, view)); + System.exit(0); + } + } +}
