Repository: cassandra
Updated Branches:
  refs/heads/trunk 961b5c9f4 -> 21448c508


Add a command to see if a Materialized View has finished building

Patch by Carl Yeksigian; reviewed by Joel Knighton for CASSANDRA-9967


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/21448c50
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/21448c50
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/21448c50

Branch: refs/heads/trunk
Commit: 21448c50891642f95097a9e5ed0a3802bd90a877
Parents: 961b5c9
Author: Carl Yeksigian <[email protected]>
Authored: Thu Mar 31 16:28:20 2016 -0400
Committer: Carl Yeksigian <[email protected]>
Committed: Thu Mar 31 16:28:20 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/cql3/QueryProcessor.java   |  8 +-
 .../org/apache/cassandra/db/SystemKeyspace.java | 31 +++++--
 .../apache/cassandra/db/view/ViewBuilder.java   | 32 +++++++-
 .../apache/cassandra/db/view/ViewManager.java   |  2 +
 .../repair/SystemDistributedKeyspace.java       | 86 +++++++++++++++++++-
 .../cassandra/service/CassandraDaemon.java      | 26 +++---
 .../cassandra/service/StorageService.java       | 19 +++++
 .../cassandra/service/StorageServiceMBean.java  |  2 +
 .../org/apache/cassandra/tools/NodeProbe.java   |  5 ++
 .../org/apache/cassandra/tools/NodeTool.java    |  3 +-
 .../tools/nodetool/ViewBuildStatus.java         | 84 +++++++++++++++++++
 12 files changed, 267 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f8a4141..0401357 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Add a command to see if a Materialized View has finished building 
(CASSANDRA-9967)
  * Log endpoint and port associated with streaming operation (CASSANDRA-8777)
  * Print sensible units for all log messages (CASSANDRA-9692)
  * Upgrade Netty to version 4.0.34 (CASSANDRA-11096)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 9801b41..1fd8564 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -299,13 +299,19 @@ public class QueryProcessor implements QueryHandler
             return null;
     }
 
+    public static UntypedResultSet execute(String query, ConsistencyLevel cl, 
Object... values)
+    throws RequestExecutionException
+    {
+        return execute(query, cl, internalQueryState(), values);
+    }
+
     public static UntypedResultSet execute(String query, ConsistencyLevel cl, 
QueryState state, Object... values)
     throws RequestExecutionException
     {
         try
         {
             ParsedStatement.Prepared prepared = prepareInternal(query);
-            ResultMessage result = prepared.statement.execute(state, 
makeInternalOptions(prepared, values));
+            ResultMessage result = prepared.statement.execute(state, 
makeInternalOptions(prepared, values, cl));
             if (result instanceof ResultMessage.Rows)
                 return 
UntypedResultSet.create(((ResultMessage.Rows)result).result);
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index a51ade0..1427de0 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -262,6 +262,7 @@ public final class SystemKeyspace
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "view_name text,"
+                + "status_replicated boolean,"
                 + "PRIMARY KEY ((keyspace_name), view_name))");
 
     @Deprecated
@@ -536,13 +537,23 @@ public final class SystemKeyspace
         return !result.isEmpty();
     }
 
-    public static void setViewBuilt(String keyspaceName, String viewName)
+    public static boolean isViewStatusReplicated(String keyspaceName, String 
viewName)
     {
-        String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name) VALUES 
(?, ?)";
-        executeInternal(String.format(req, NAME, BUILT_VIEWS), keyspaceName, 
viewName);
-        forceBlockingFlush(BUILT_VIEWS);
+        String req = "SELECT status_replicated FROM %s.\"%s\" WHERE 
keyspace_name=? AND view_name=?";
+        UntypedResultSet result = executeInternal(String.format(req, NAME, 
BUILT_VIEWS), keyspaceName, viewName);
+
+        if (result.isEmpty())
+            return false;
+        UntypedResultSet.Row row = result.one();
+        return row.has("status_replicated") && 
row.getBoolean("status_replicated");
     }
 
+    public static void setViewBuilt(String keyspaceName, String viewName, 
boolean replicated)
+    {
+        String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name, 
status_replicated) VALUES (?, ?, ?)";
+        executeInternal(String.format(req, NAME, BUILT_VIEWS), keyspaceName, 
viewName, replicated);
+        forceBlockingFlush(BUILT_VIEWS);
+    }
 
     public static void setViewRemoved(String keyspaceName, String viewName)
     {
@@ -568,14 +579,18 @@ public final class SystemKeyspace
         // We flush the view built first, because if we fail now, we'll 
restart at the last place we checkpointed
         // view build.
         // If we flush the delete first, we'll have to restart from the 
beginning.
-        // Also, if the build succeeded, but the view build failed, we will be 
able to skip the view build check
-        // next boot.
-        setViewBuilt(ksname, viewName);
-        forceBlockingFlush(BUILT_VIEWS);
+        // Also, if writing to the built_view succeeds, but the 
view_builds_in_progress deletion fails, we will be able
+        // to skip the view build next boot.
+        setViewBuilt(ksname, viewName, false);
         executeInternal(String.format("DELETE FROM system.%s WHERE 
keyspace_name = ? AND view_name = ?", VIEWS_BUILDS_IN_PROGRESS), ksname, 
viewName);
         forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS);
     }
 
+    public static void setViewBuiltReplicated(String ksname, String viewName)
+    {
+        setViewBuilt(ksname, viewName, true);
+    }
+
     public static void updateViewBuildStatus(String ksname, String viewName, 
Token token)
     {
         String req = "INSERT INTO system.%s (keyspace_name, view_name, 
last_token) VALUES (?, ?, ?)";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java 
b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 8146211..23eeba4 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.repair.SystemDistributedKeyspace;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.pager.QueryPager;
@@ -102,10 +103,15 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     public void run()
     {
+        UUID localHostId = SystemKeyspace.getLocalHostId();
         String ksname = baseCfs.metadata.ksName, viewName = view.name;
 
         if (SystemKeyspace.isViewBuilt(ksname, viewName))
+        {
+            if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName))
+                updateDistributed(ksname, viewName, localHostId);
             return;
+        }
 
         Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
         final Pair<Integer, Token> buildStatus = 
SystemKeyspace.getViewBuildStatus(ksname, viewName);
@@ -148,6 +154,7 @@ public class ViewBuilder extends CompactionInfo.Holder
         try (Refs<SSTableReader> sstables = 
baseCfs.selectAndReference(function).refs;
              ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
         {
+            SystemDistributedKeyspace.startViewBuild(ksname, viewName, 
localHostId);
             while (!isStopped && iter.hasNext())
             {
                 DecoratedKey key = iter.next();
@@ -172,19 +179,36 @@ public class ViewBuilder extends CompactionInfo.Holder
             }
 
             if (!isStopped)
-            SystemKeyspace.finishViewBuildStatus(ksname, viewName);
-
+            {
+                SystemKeyspace.finishViewBuildStatus(ksname, viewName);
+                updateDistributed(ksname, viewName, localHostId);
+            }
         }
         catch (Exception e)
         {
-            final ViewBuilder builder = new ViewBuilder(baseCfs, view);
-            ScheduledExecutors.nonPeriodicTasks.schedule(() -> 
CompactionManager.instance.submitViewBuilder(builder),
+            ScheduledExecutors.nonPeriodicTasks.schedule(() -> 
CompactionManager.instance.submitViewBuilder(this),
                                                          5,
                                                          TimeUnit.MINUTES);
             logger.warn("Materialized View failed to complete, sleeping 5 
minutes before restarting", e);
         }
     }
 
+    private void updateDistributed(String ksname, String viewName, UUID 
localHostId)
+    {
+        try
+        {
+            SystemDistributedKeyspace.successfulViewBuild(ksname, viewName, 
localHostId);
+            SystemKeyspace.setViewBuiltReplicated(ksname, viewName);
+        }
+        catch (Exception e)
+        {
+            ScheduledExecutors.nonPeriodicTasks.schedule(() -> 
CompactionManager.instance.submitViewBuilder(this),
+                                                         5,
+                                                         TimeUnit.MINUTES);
+            logger.warn("Failed to updated the distributed status of view, 
sleeping 5 minutes before retrying", e);
+        }
+    }
+
     public CompactionInfo getCompactionInfo()
     {
         long rangesLeft = 0, rangesTotal = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java 
b/src/java/org/apache/cassandra/db/view/ViewManager.java
index ac48cfe..faa5551 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.SystemDistributedKeyspace;
 import org.apache.cassandra.service.StorageProxy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -235,6 +236,7 @@ public class ViewManager
 
         forTable(view.getDefinition().baseTableId).removeView(name);
         SystemKeyspace.setViewRemoved(keyspace.getName(), view.name);
+        SystemDistributedKeyspace.setViewRemoved(keyspace.getName(), 
view.name);
     }
 
     public void buildAllViews()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java 
b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 2a479b4..fbbc125 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -23,12 +23,15 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import org.slf4j.Logger;
@@ -36,16 +39,19 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Tables;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
 public final class SystemDistributedKeyspace
 {
     private SystemDistributedKeyspace()
@@ -60,6 +66,8 @@ public final class SystemDistributedKeyspace
 
     public static final String PARENT_REPAIR_HISTORY = "parent_repair_history";
 
+    public static final String VIEW_BUILD_STATUS = "view_build_status";
+
     private static final CFMetaData RepairHistory =
         compile(REPAIR_HISTORY,
                 "Repair history",
@@ -95,6 +103,16 @@ public final class SystemDistributedKeyspace
                      + "options map<text, text>,"
                      + "PRIMARY KEY (parent_id))");
 
+    private static final CFMetaData ViewBuildStatus =
+    compile(VIEW_BUILD_STATUS,
+            "Materialized View build status",
+            "CREATE TABLE %s ("
+                     + "keyspace_name text,"
+                     + "view_name text,"
+                     + "host_id uuid,"
+                     + "status text,"
+                     + "PRIMARY KEY ((keyspace_name, view_name), host_id))");
+
     private static CFMetaData compile(String name, String description, String 
schema)
     {
         return CFMetaData.compile(String.format(schema, name), NAME)
@@ -103,7 +121,7 @@ public final class SystemDistributedKeyspace
 
     public static KeyspaceMetadata metadata()
     {
-        return KeyspaceMetadata.create(NAME, KeyspaceParams.simple(3), 
Tables.of(RepairHistory, ParentRepairHistory));
+        return KeyspaceMetadata.create(NAME, KeyspaceParams.simple(3), 
Tables.of(RepairHistory, ParentRepairHistory, ViewBuildStatus));
     }
 
     public static void startParentRepair(UUID parent_id, String keyspaceName, 
String[] cfnames, RepairOption options)
@@ -220,6 +238,58 @@ public final class SystemDistributedKeyspace
         processSilent(fmtQry, t.getMessage(), sw.toString());
     }
 
+    public static void startViewBuild(String keyspace, String view, UUID 
hostId)
+    {
+        String query = "INSERT INTO %s.%s (keyspace_name, view_name, host_id, 
status) VALUES (?, ?, ?, ?)";
+        QueryProcessor.process(String.format(query, NAME, VIEW_BUILD_STATUS),
+                               ConsistencyLevel.ONE,
+                               Lists.newArrayList(bytes(keyspace),
+                                                  bytes(view),
+                                                  bytes(hostId),
+                                                  
bytes(BuildStatus.STARTED.toString())));
+    }
+
+    public static void successfulViewBuild(String keyspace, String view, UUID 
hostId)
+    {
+        String query = "UPDATE %s.%s SET status = ? WHERE keyspace_name = ? 
AND view_name = ? AND host_id = ?";
+        QueryProcessor.process(String.format(query, NAME, VIEW_BUILD_STATUS),
+                               ConsistencyLevel.ONE,
+                               
Lists.newArrayList(bytes(BuildStatus.SUCCESS.toString()),
+                                                  bytes(keyspace),
+                                                  bytes(view),
+                                                  bytes(hostId)));
+    }
+
+    public static Map<UUID, String> viewStatus(String keyspace, String view)
+    {
+        String query = "SELECT host_id, status FROM %s.%s WHERE keyspace_name 
= ? AND view_name = ?";
+        UntypedResultSet results;
+        try
+        {
+            results = QueryProcessor.execute(String.format(query, NAME, 
VIEW_BUILD_STATUS),
+                                             ConsistencyLevel.ONE,
+                                             keyspace,
+                                             view);
+        } catch (Exception e) {
+            return Collections.emptyMap();
+        }
+
+
+        Map<UUID, String> status = new HashMap<>();
+        for (UntypedResultSet.Row row : results)
+        {
+            status.put(row.getUUID("host_id"), row.getString("status"));
+        }
+        return status;
+    }
+
+    public static void setViewRemoved(String keyspaceName, String viewName)
+    {
+        String buildReq = "DELETE FROM %s.%s WHERE keyspace_name = ? AND 
view_name = ?";
+        QueryProcessor.executeInternal(String.format(buildReq, NAME, 
VIEW_BUILD_STATUS), keyspaceName, viewName);
+        forceBlockingFlush(VIEW_BUILD_STATUS);
+    }
+
     private static void processSilent(String fmtQry, String... values)
     {
         try
@@ -227,7 +297,7 @@ public final class SystemDistributedKeyspace
             List<ByteBuffer> valueList = new ArrayList<>();
             for (String v : values)
             {
-                valueList.add(ByteBufferUtil.bytes(v));
+                valueList.add(bytes(v));
             }
             QueryProcessor.process(fmtQry, ConsistencyLevel.ONE, valueList);
         }
@@ -237,9 +307,19 @@ public final class SystemDistributedKeyspace
         }
     }
 
+    public static void forceBlockingFlush(String table)
+    {
+        if (!Boolean.getBoolean("cassandra.unsafesystem"))
+            
FBUtilities.waitOnFuture(Keyspace.open(NAME).getColumnFamilyStore(table).forceFlush());
+    }
 
     private enum RepairState
     {
         STARTED, SUCCESS, FAILED
     }
+
+    private enum BuildStatus
+    {
+        UNKNOWN, STARTED, SUCCESS
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index f0f790a..160235a 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -318,21 +318,6 @@ public class CassandraDaemon
             }
         }
 
-        Runnable viewRebuild = new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                for (Keyspace keyspace : Keyspace.all())
-                {
-                    keyspace.viewManager.buildAllViews();
-                }
-            }
-        };
-
-        ScheduledExecutors.optionalTasks.schedule(viewRebuild, 
StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
-
-
         SystemKeyspace.finishStartup();
 
         // Metrics
@@ -363,6 +348,17 @@ public class CassandraDaemon
             exitOrFail(1, "Fatal configuration error", e);
         }
 
+        // Because we are writing to the system_distributed keyspace, this 
should happen after that is created, which
+        // happens in StorageService.instance.initServer()
+        Runnable viewRebuild = () -> {
+            for (Keyspace keyspace : Keyspace.all())
+            {
+                keyspace.viewManager.buildAllViews();
+            }
+        };
+
+        ScheduledExecutors.optionalTasks.schedule(viewRebuild, 
StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
+
         Mx4jTool.maybeLoad();
 
         if 
(!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 52dcb97..fe3e982 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4398,6 +4398,25 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return Collections.unmodifiableList(keyspaceNamesList);
     }
 
+    public Map<String, String> getViewBuildStatuses(String keyspace, String 
view)
+    {
+        Map<UUID, String> coreViewStatus = 
SystemDistributedKeyspace.viewStatus(keyspace, view);
+        Map<InetAddress, UUID> hostIdToEndpoint = 
tokenMetadata.getEndpointToHostIdMapForReading();
+        Map<String, String> result = new HashMap<>();
+
+        for (Map.Entry<InetAddress, UUID> entry : hostIdToEndpoint.entrySet())
+        {
+            UUID hostId = entry.getValue();
+            InetAddress endpoint = entry.getKey();
+            result.put(endpoint.toString(),
+                       coreViewStatus.containsKey(hostId)
+                       ? coreViewStatus.get(hostId)
+                       : "UNKNOWN");
+        }
+
+        return Collections.unmodifiableMap(result);
+    }
+
     public void updateSnitch(String epSnitchClassName, Boolean dynamic, 
Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double 
dynamicBadnessThreshold) throws ClassNotFoundException
     {
         IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 2238f33..6598075 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -452,6 +452,8 @@ public interface StorageServiceMBean extends 
NotificationEmitter
 
     public List<String> getNonSystemKeyspaces();
 
+    public Map<String, String> getViewBuildStatuses(String keyspace, String 
view);
+
     /**
      * Change endpointsnitch class and dynamic-ness (and dynamic attributes) 
at runtime
      * @param epSnitchClassName        the canonical path name for a class 
implementing IEndpointSnitch

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 19a90a0..4dc2770 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -871,6 +871,11 @@ public class NodeProbe implements AutoCloseable
         return spProxy.getHintedHandoffDisabledDCs();
     }
 
+    public Map<String, String> getViewBuildStatuses(String keyspace, String 
view)
+    {
+        return ssProxy.getViewBuildStatuses(keyspace, view);
+    }
+
     public void pauseHintsDelivery()
     {
         hhProxy.pauseHintsDelivery(true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 300c7a0..aca2869 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -139,7 +139,8 @@ public class NodeTool
                 EnableHintsForDC.class,
                 FailureDetectorInfo.class,
                 RefreshSizeEstimates.class,
-                RelocateSSTables.class
+                RelocateSSTables.class,
+                ViewBuildStatus.class
         );
 
         Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/21448c50/src/java/org/apache/cassandra/tools/nodetool/ViewBuildStatus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/ViewBuildStatus.java 
b/src/java/org/apache/cassandra/tools/nodetool/ViewBuildStatus.java
new file mode 100644
index 0000000..0696396
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/ViewBuildStatus.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import io.airlift.command.Arguments;
+import io.airlift.command.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "viewbuildstatus", description = "Show progress of a 
materialized view build")
+public class ViewBuildStatus extends NodeTool.NodeToolCmd
+{
+    private final static String SUCCESS = "SUCCESS";
+
+    @Arguments(usage = "<keyspace> <view> | <keyspace.view>", description = 
"The keyspace and view name")
+    private List<String> args = new ArrayList<>();
+
+    protected void execute(NodeProbe probe)
+    {
+        String keyspace = null, view = null;
+        if (args.size() == 2)
+        {
+            keyspace = args.get(0);
+            view = args.get(1);
+        }
+        else if (args.size() == 1)
+        {
+            String[] input = args.get(0).split("\\.");
+            checkArgument(input.length == 2, "viewbuildstatus requires 
keyspace and view name arguments");
+            keyspace = input[0];
+            view = input[1];
+        }
+        else
+        {
+            checkArgument(false, "viewbuildstatus requires keyspace and view 
name arguments");
+        }
+
+        Map<String, String> buildStatus = probe.getViewBuildStatuses(keyspace, 
view);
+        boolean failed = false;
+        TableBuilder builder = new TableBuilder();
+
+        builder.add("Host", "Info");
+        for (Map.Entry<String, String> status : buildStatus.entrySet())
+        {
+            if (!status.getValue().equals(SUCCESS)) {
+                failed = true;
+            }
+            builder.add(status.getKey(), status.getValue());
+        }
+
+        if (failed) {
+            System.out.println(String.format("%s.%s has not finished building; 
node status is below.", keyspace, view));
+            System.out.println();
+            builder.printTo(System.out);
+            System.exit(1);
+        } else {
+            System.out.println(String.format("%s.%s has finished building", 
keyspace, view));
+            System.exit(0);
+        }
+    }
+}

Reply via email to