This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6df595a  Fix Spellings and Code Cleanup (#3181)
6df595a is described below

commit 6df595a341a884404242b8d6dcf0d686e71907ee
Author: Ali Ahmed <alahmed...@gmail.com>
AuthorDate: Thu Dec 13 06:15:34 2018 -0800

    Fix Spellings and Code Cleanup (#3181)
    
    * Fix Spellings and Code Cleanup
    
    * Fix Code comments
    
    * Remove Tenant check
    
    * Fix test cases
---
 .../pulsar/broker/admin/impl/BrokerStatsBase.java  |   2 +-
 .../pulsar/broker/admin/impl/BrokersBase.java      |   2 +-
 .../pulsar/broker/admin/impl/ClustersBase.java     |   4 +-
 .../broker/admin/impl/ResourceQuotasBase.java      |   4 +-
 .../apache/pulsar/broker/admin/v2/BrokerStats.java |   2 +-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |   2 +-
 .../org/apache/pulsar/admin/cli/CmdSources.java    |   2 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |   4 +-
 .../functions/worker/rest/api/ComponentImpl.java   | 246 ++++++++++++++-------
 .../functions/worker/rest/api/FunctionsImpl.java   |  22 +-
 .../worker/rest/api/FunctionsMetricsResource.java  |   3 +-
 .../pulsar/functions/worker/rest/api/SinkImpl.java |  33 ++-
 .../functions/worker/rest/api/SourceImpl.java      |  28 ++-
 .../functions/worker/rest/api/WorkerImpl.java      |  25 ++-
 14 files changed, 246 insertions(+), 133 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
index 03fc9a2..43b9ba4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
@@ -87,7 +87,7 @@ public class BrokerStatsBase extends AdminResource {
 
     @GET
     @Path("/destinations")
-    @ApiOperation(value = "Get all the topic stats by namesapce", response = 
OutputStream.class, responseContainer = "OutputStream") // 
https://github.com/swagger-api/swagger-ui/issues/558
+    @ApiOperation(value = "Get all the topic stats by namespace", response = 
OutputStream.class, responseContainer = "OutputStream") // 
https://github.com/swagger-api/swagger-ui/issues/558
                                                                                
                                                            // map
                                                                                
                                                            // support
                                                                                
                                                            // missing
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index c4d504c..900ad7c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -117,7 +117,7 @@ public class BrokersBase extends AdminResource {
             @ApiResponse(code = 403, message = "You don't have admin 
permission to update service-configuration"),
             @ApiResponse(code = 404, message = "Configuration not found"),
             @ApiResponse(code = 412, message = "Configuration can't be updated 
dynamically") })
-    public void updateDynamicConfiguration(@PathParam("configName") String 
configName, @PathParam("configValue") String configValue) throws Exception{
+    public void updateDynamicConfiguration(@PathParam("configName") String 
configName, @PathParam("configValue") String configValue) throws Exception {
         validateSuperUserAccess();
         updateDynamicConfigurationOnZk(configName, configValue);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 3fcb7d6..b65d268 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -344,7 +344,7 @@ public class ClustersBase extends AdminResource {
                     .get(path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "NamespaceIsolationPolicies for cluster " + 
cluster + " does not exist"));
-            // construct the response to NamespaceisolationData map
+            // construct the response to Namespace isolation data map
             return nsIsolationPolicies.getPolicies();
         } catch (Exception e) {
             log.error("[{}] Failed to get 
clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
@@ -368,7 +368,7 @@ public class ClustersBase extends AdminResource {
                     .get(path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "NamespaceIsolationPolicies for cluster " + 
cluster + " does not exist"));
-            // construct the response to NamespaceisolationData map
+            // construct the response to Namespace isolation data map
             if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
                 log.info("[{}] Cannot find NamespaceIsolationPolicy {} for 
cluster {}", policyName, cluster);
                 throw new RestException(Status.NOT_FOUND,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
index 2c664f0..3f502da 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
@@ -94,7 +94,7 @@ public abstract class ResourceQuotasBase extends 
NamespacesBase {
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to set resource quota for namespace bundle 
{}: concurrent modification",
                     clientAppId(), nsBundle.toString());
-            throw new RestException(Status.CONFLICT, "Cuncurrent modification 
on namespace bundle quota");
+            throw new RestException(Status.CONFLICT, "Concurrent modification 
on namespace bundle quota");
         } catch (Exception e) {
             log.error("[{}] Failed to set resource quota for namespace bundle 
{}", clientAppId(), nsBundle.toString());
             throw new RestException(e);
@@ -123,7 +123,7 @@ public abstract class ResourceQuotasBase extends 
NamespacesBase {
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to unset resource quota for namespace bundle 
{}: concurrent modification",
                     clientAppId(), nsBundle.toString());
-            throw new RestException(Status.CONFLICT, "Cuncurrent modification 
on namespace bundle quota");
+            throw new RestException(Status.CONFLICT, "Concurrent modification 
on namespace bundle quota");
         } catch (Exception e) {
             log.error("[{}] Failed to unset resource quota for namespace 
bundle {}", clientAppId(),
                     nsBundle.toString());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
index 5fd5783..a04fded 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
@@ -43,7 +43,7 @@ public class BrokerStats extends BrokerStatsBase {
     @GET
     @Path("/topics")
     @ApiOperation(
-            value = "Get all the topic stats by namesapce",
+            value = "Get all the topic stats by namespace",
             response = OutputStream.class,
             responseContainer = "OutputStream")
     // https://github.com/swagger-api/swagger-ui/issues/558
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 47ce6e4..5d1b9c0 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -86,7 +86,7 @@ public class CmdSinks extends CmdBase {
         jcommander.addCommand("delete", deleteSink);
         jcommander.addCommand("list", listSinks);
         jcommander.addCommand("get", getSink);
-        // TODO depecreate getstatus
+        // TODO deprecate getstatus
         jcommander.addCommand("status", getSinkStatus, "getstatus");
         jcommander.addCommand("stop", stopSink);
         jcommander.addCommand("restart", restartSink);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 9ec950a..7acd85b 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -382,7 +382,7 @@ public class CmdSources extends CmdBase {
 
         protected void validateSourceConfigs(SourceConfig sourceConfig) {
             if (isBlank(sourceConfig.getArchive())) {
-                throw new ParameterException("Source archive not specfied");
+                throw new ParameterException("Source archive not specified");
             }
             
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
             if 
(!Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) &&
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 2eb951a..bf9c079 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -178,7 +178,7 @@ public class TestCmdSources {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source archive not specfied")
+    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source archive not specified")
     public void testMissingArchive() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
         sourceConfig.setArchive(null);
@@ -356,7 +356,7 @@ public class TestCmdSources {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source archive not specfied")
+    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source archive not specified")
     public void testCmdSourceConfigFileMissingJar() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
         testSourceConfig.setArchive(null);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index dd2dce2..fa8f783 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -153,12 +153,16 @@ public abstract class ComponentImpl {
 
         public abstract T notScheduledInstance();
 
-        public abstract T 
fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String 
assignedWorkerId);
+        public abstract T fromFunctionStatusProto(final 
InstanceCommunication.FunctionStatus status,
+                                                  final String 
assignedWorkerId);
 
-        public abstract T notRunning(String assignedWorkerId, String error);
+        public abstract T notRunning(final String assignedWorkerId, final 
String error);
 
-        public T getComponentInstanceStatus(String tenant, String namespace,
-                                            String name, int instanceId, URI 
uri) {
+        public T getComponentInstanceStatus(final String tenant,
+                                            final String namespace,
+                                            final String name,
+                                            final int instanceId,
+                                            final URI uri) {
 
             Function.Assignment assignment;
             if 
(worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
@@ -218,16 +222,23 @@ public abstract class ComponentImpl {
             }
         }
 
-        public abstract S getStatus(String tenant, String namespace,
-                                    String name, 
Collection<Function.Assignment> assignments, URI uri) throws 
PulsarAdminException;
+        public abstract S getStatus(final String tenant,
+                                    final String namespace,
+                                    final String name,
+                                    final Collection<Function.Assignment> 
assignments,
+                                    final URI uri) throws PulsarAdminException;
 
-        public abstract S getStatusExternal(String tenant, String namespace,
-                                            String name, int parallelism);
+        public abstract S getStatusExternal(final String tenant,
+                                            final String namespace,
+                                            final String name,
+                                            final int parallelism);
 
-        public abstract S emptyStatus(int parallelism);
+        public abstract S emptyStatus(final int parallelism);
 
-        public S getComponentStatus(String tenant, String namespace,
-                                    String name, URI uri) {
+        public S getComponentStatus(final String tenant,
+                                    final String namespace,
+                                    final String name,
+                                    final URI uri) {
 
             Function.FunctionMetaData functionMetaData = 
worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, 
name);
 
@@ -291,10 +302,14 @@ public abstract class ComponentImpl {
         return true;
     }
 
-    public Response registerFunction(final String tenant, final String 
namespace, final String componentName,
-                                     final InputStream uploadedInputStream, 
final FormDataContentDisposition fileDetail,
-                                     final String functionPkgUrl, final String 
functionDetailsJson, final String componentConfigJson,
-
+    public Response registerFunction(final String tenant,
+                                     final String namespace,
+                                     final String componentName,
+                                     final InputStream uploadedInputStream,
+                                     final FormDataContentDisposition 
fileDetail,
+                                     final String functionPkgUrl,
+                                     final String functionDetailsJson,
+                                     final String componentConfigJson,
                                      final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
@@ -315,9 +330,11 @@ public abstract class ComponentImpl {
         }
 
         try {
-            TenantInfo tenantInfo = 
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
-            String qualifedNamespace = tenant + "/" + namespace;
-            if 
(!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifedNamespace))
 {
+            // Check tenant exists
+            final TenantInfo tenantInfo = 
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+            String qualifiedNamespace = tenant + "/" + namespace;
+            if 
(!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifiedNamespace))
 {
                 log.error("{}/{}/{} Namespace {} does not exist", tenant, 
namespace,
                         componentName, namespace);
                 return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -409,10 +426,10 @@ public abstract class ComponentImpl {
         return updateRequest(functionMetaDataBuilder.build());
     }
 
-    private PackageLocationMetaData.Builder 
getFunctionPackageLocation(FunctionDetails functionDetails,
-                                                                       String 
functionPkgUrl,
+    private PackageLocationMetaData.Builder getFunctionPackageLocation(final 
FunctionDetails functionDetails,
+                                                                       final 
String functionPkgUrl,
                                                                        final 
FormDataContentDisposition fileDetail,
-                                                                       File 
uploadedInputStreamAsFile) throws Exception {
+                                                                       final 
File uploadedInputStreamAsFile) throws Exception {
         String tenant = functionDetails.getTenant();
         String namespace = functionDetails.getNamespace();
         String componentName = functionDetails.getName();
@@ -466,9 +483,14 @@ public abstract class ComponentImpl {
     }
 
 
-    public Response updateFunction(final String tenant, final String 
namespace, final String componentName,
-                                   final InputStream uploadedInputStream, 
final FormDataContentDisposition fileDetail,
-                                   final String functionPkgUrl, final String 
functionDetailsJson, final String componentConfigJson,
+    public Response updateFunction(final String tenant,
+                                   final String namespace,
+                                   final String componentName,
+                                   final InputStream uploadedInputStream,
+                                   final FormDataContentDisposition fileDetail,
+                                   final String functionPkgUrl,
+                                   final String functionDetailsJson,
+                                   final String componentConfigJson,
                                    final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
@@ -516,7 +538,7 @@ public abstract class ComponentImpl {
             FunctionConfig existingFunctionConfig = 
FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new 
Gson().toJson(existingFunctionConfig);
             FunctionConfig functionConfig = new 
Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            // The rest end points take precendence over whatever is there in 
functionconfig
+            // The rest end points take precedence over whatever is there in 
functionconfig
             functionConfig.setTenant(tenant);
             functionConfig.setNamespace(namespace);
             functionConfig.setName(componentName);
@@ -531,7 +553,7 @@ public abstract class ComponentImpl {
             SourceConfig existingSourceConfig = 
SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new 
Gson().toJson(existingSourceConfig);
             SourceConfig sourceConfig = new 
Gson().fromJson(componentConfigJson, SourceConfig.class);
-            // The rest end points take precendence over whatever is there in 
functionconfig
+            // The rest end points take precedence over whatever is there in 
functionconfig
             sourceConfig.setTenant(tenant);
             sourceConfig.setNamespace(namespace);
             sourceConfig.setName(componentName);
@@ -546,7 +568,7 @@ public abstract class ComponentImpl {
             SinkConfig existingSinkConfig = 
SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new 
Gson().toJson(existingSinkConfig);
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, 
SinkConfig.class);
-            // The rest end points take precendence over whatever is there in 
functionconfig
+            // The rest end points take precedence over whatever is there in 
functionconfig
             sinkConfig.setTenant(tenant);
             sinkConfig.setNamespace(namespace);
             sinkConfig.setName(componentName);
@@ -570,7 +592,6 @@ public abstract class ComponentImpl {
         if (uploadedInputStream != null) {
             uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream);
         }
-        File existingPackageAsFile = null;
 
         // validate parameters
         try {
@@ -581,7 +602,8 @@ public abstract class ComponentImpl {
                 functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName, uploadedInputStreamAsFile,
                         fileDetail, functionDetailsJson, 
mergedComponentConfigJson, componentType);
             } else {
-                functionDetails = 
validateUpdateRequestParamsWithExistingMetadata(tenant, namespace, 
componentName, existingComponent.getPackageLocation(), 
mergedComponentConfigJson, componentType);
+                functionDetails = 
validateUpdateRequestParamsWithExistingMetadata(
+                        tenant, namespace, componentName, 
existingComponent.getPackageLocation(), mergedComponentConfigJson, 
componentType);
             }
         } catch (Exception e) {
             log.error("Invalid update {} request @ /{}/{}/{}", componentType, 
tenant, namespace, componentName, e);
@@ -618,8 +640,10 @@ public abstract class ComponentImpl {
         return updateRequest(functionMetaDataBuilder.build());
     }
 
-    public Response deregisterFunction(final String tenant, final String 
namespace, final String componentName,
-                                       String clientRole) {
+    public Response deregisterFunction(final String tenant,
+                                       final String namespace,
+                                       final String componentName,
+                                       final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -702,8 +726,9 @@ public abstract class ComponentImpl {
         return 
Response.status(Status.OK).entity(requestResult.toJson()).build();
     }
 
-    public Response getFunctionInfo(final String tenant, final String 
namespace, final String componentName)
-            throws IOException {
+    public Response getFunctionInfo(final String tenant,
+                                    final String namespace,
+                                    final String componentName) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -731,32 +756,42 @@ public abstract class ComponentImpl {
                     .entity(new ErrorData(String.format(componentType + " %s 
doesn't exist", componentName))).build();
         }
 
-        String retval;
+        String retVal;
         if (componentType.equals(FUNCTION)) {
             FunctionConfig config = 
FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retval = new Gson().toJson(config);
+            retVal = new Gson().toJson(config);
         } else if (componentType.equals(SOURCE)) {
             SourceConfig config = 
SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retval = new Gson().toJson(config);
+            retVal = new Gson().toJson(config);
         } else {
             SinkConfig config = 
SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retval = new Gson().toJson(config);
+            retVal = new Gson().toJson(config);
         }
-        return Response.status(Status.OK).entity(retval).build();
+        return Response.status(Status.OK).entity(retVal).build();
     }
 
-    public Response stopFunctionInstance(final String tenant, final String 
namespace, final String componentName,
-                                         final String instanceId, URI uri) {
+    public Response stopFunctionInstance(final String tenant,
+                                         final String namespace,
+                                         final String componentName,
+                                         final String instanceId,
+                                         final URI uri) {
         return stopFunctionInstance(tenant, namespace, componentName, 
instanceId, false, uri);
     }
 
-    public Response restartFunctionInstance(final String tenant, final String 
namespace, final String componentName,
-                                            final String instanceId, URI uri) {
+    public Response restartFunctionInstance(final String tenant,
+                                            final String namespace,
+                                            final String componentName,
+                                            final String instanceId,
+                                            final URI uri) {
         return stopFunctionInstance(tenant, namespace, componentName, 
instanceId, true, uri);
     }
 
-    public Response stopFunctionInstance(final String tenant, final String 
namespace, final String componentName,
-                                         final String instanceId, boolean 
restart, URI uri) {
+    public Response stopFunctionInstance(final String tenant,
+                                         final String namespace,
+                                         final String componentName,
+                                         final String instanceId,
+                                         final boolean restart,
+                                         final URI uri) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -797,16 +832,22 @@ public abstract class ComponentImpl {
         }
     }
 
-    public Response stopFunctionInstances(final String tenant, final String 
namespace, final String componentName) {
+    public Response stopFunctionInstances(final String tenant,
+                                          final String namespace,
+                                          final String componentName) {
         return stopFunctionInstances(tenant, namespace, componentName, false);
     }
 
-    public Response restartFunctionInstances(final String tenant, final String 
namespace, final String componentName) {
+    public Response restartFunctionInstances(final String tenant,
+                                             final String namespace,
+                                             final String componentName) {
         return stopFunctionInstances(tenant, namespace, componentName, true);
     }
 
-    public Response stopFunctionInstances(final String tenant, final String 
namespace, final String componentName,
-                                          boolean restart) {
+    public Response stopFunctionInstances(final String tenant,
+                                          final String namespace,
+                                          final String componentName,
+                                          final boolean restart) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -846,8 +887,10 @@ public abstract class ComponentImpl {
         }
     }
 
-    public FunctionStats getFunctionStats(final String tenant, final String 
namespace, final String componentName,
-                                          URI uri) throws IOException {
+    public FunctionStats getFunctionStats(final String tenant,
+                                          final String namespace,
+                                          final String componentName,
+                                          final URI uri) {
         if (!isWorkerServiceAvailable()) {
             throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
         }
@@ -887,8 +930,11 @@ public abstract class ComponentImpl {
 
     }
 
-    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
getFunctionsInstanceStats(final String tenant, final String namespace, final 
String componentName,
-                                                                               
                    String instanceId, URI uri) {
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
getFunctionsInstanceStats(final String tenant,
+                                                                               
                    final String namespace,
+                                                                               
                    final String componentName,
+                                                                               
                    final String instanceId,
+                                                                               
                    final URI uri) {
         if (!isWorkerServiceAvailable()) {
             throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
         }
@@ -997,14 +1043,17 @@ public abstract class ComponentImpl {
         return this.worker().getConnectorsManager().getConnectors();
     }
 
-    public Response triggerFunction(final String tenant, final String 
namespace, final String functionName,
-                                    final String input, final InputStream 
uploadedInputStream, final String topic) {
+    public Response triggerFunction(final String tenant,
+                                    final String namespace,
+                                    final String functionName,
+                                    final String input,
+                                    final InputStream uploadedInputStream,
+                                    final String topic) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
 
-        FunctionDetails functionDetails;
         // validate parameters
         try {
             validateTriggerRequestParams(tenant, namespace, functionName, 
topic, input, uploadedInputStream);
@@ -1098,8 +1147,10 @@ public abstract class ComponentImpl {
         }
     }
 
-    public Response getFunctionState(final String tenant, final String 
namespace,
-                                     final String functionName, final String 
key) {
+    public Response getFunctionState(final String tenant,
+                                     final String namespace,
+                                     final String functionName,
+                                     final String key) {
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
@@ -1209,7 +1260,7 @@ public abstract class ComponentImpl {
         }).build();
     }
 
-    private void validateListFunctionRequestParams(String tenant, String 
namespace) throws IllegalArgumentException {
+    private void validateListFunctionRequestParams(final String tenant, final 
String namespace) throws IllegalArgumentException {
 
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
@@ -1219,8 +1270,11 @@ public abstract class ComponentImpl {
         }
     }
 
-    protected void validateGetFunctionInstanceRequestParams(String tenant, 
String namespace, String componentName,
-                                                          ComponentType 
componentType, String instanceId) throws IllegalArgumentException {
+    protected void validateGetFunctionInstanceRequestParams(final String 
tenant,
+                                                            final String 
namespace,
+                                                            final String 
componentName,
+                                                            final 
ComponentType componentType,
+                                                            final String 
instanceId) throws IllegalArgumentException {
         validateGetFunctionRequestParams(tenant, namespace, componentName, 
componentType);
         if (instanceId == null) {
             throw new IllegalArgumentException(String.format("%s Instance Id 
is not provided", componentType));
@@ -1255,10 +1309,14 @@ public abstract class ComponentImpl {
         }
     }
 
-    private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String 
tenant, String namespace, String componentName,
-                                                                  String 
functionPkgUrl, String functionDetailsJson, String componentConfigJson,
-                                                                  
ComponentType componentType)
-            throws IllegalArgumentException, IOException, URISyntaxException {
+    private FunctionDetails validateUpdateRequestParamsWithPkgUrl(final String 
tenant,
+                                                                  final String 
namespace,
+                                                                  final String 
componentName,
+                                                                  final String 
functionPkgUrl,
+                                                                  final String 
functionDetailsJson,
+                                                                  final String 
componentConfigJson,
+                                                                  final 
ComponentType componentType)
+            throws IllegalArgumentException, IOException {
         if 
(!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionPkgUrl))
 {
             throw new IllegalArgumentException("Function Package url is not 
valid. supported url (http/https/file)");
         }
@@ -1267,9 +1325,14 @@ public abstract class ComponentImpl {
         return functionDetails;
     }
 
-    private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String componentName,
-                                                        File 
uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String 
functionDetailsJson,
-                                                        String 
componentConfigJson, ComponentType componentType)
+    private FunctionDetails validateUpdateRequestParams(final String tenant,
+                                                        final String namespace,
+                                                        final String 
componentName,
+                                                        final File 
uploadedInputStreamAsFile,
+                                                        final 
FormDataContentDisposition fileDetail,
+                                                        final String 
functionDetailsJson,
+                                                        final String 
componentConfigJson,
+                                                        final ComponentType 
componentType)
             throws IllegalArgumentException, IOException {
 
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName,
@@ -1281,9 +1344,12 @@ public abstract class ComponentImpl {
         return functionDetails;
     }
 
-    private FunctionDetails 
validateUpdateRequestParamsWithExistingMetadata(String tenant, String 
namespace, String componentName,
-                                                                            
PackageLocationMetaData packageLocationMetaData,
-                                                                            
String componentConfigJson, ComponentType componentType) throws Exception {
+    private FunctionDetails 
validateUpdateRequestParamsWithExistingMetadata(final String tenant,
+                                                                            
final String namespace,
+                                                                            
final String componentName,
+                                                                            
final PackageLocationMetaData packageLocationMetaData,
+                                                                            
final String componentConfigJson,
+                                                                            
final ComponentType componentType) throws Exception {
         File tmpFile = File.createTempFile("functions", null);
         tmpFile.deleteOnExit();
         Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, 
packageLocationMetaData.getPackagePath());
@@ -1291,7 +1357,7 @@ public abstract class ComponentImpl {
                 null, componentConfigJson, componentType, null, tmpFile);
     }
 
-    private static File dumpToTmpFile(InputStream uploadedInputStream) {
+    private static File dumpToTmpFile(final InputStream uploadedInputStream) {
         try {
             File tmpFile = File.createTempFile("functions", null);
             tmpFile.deleteOnExit();
@@ -1302,7 +1368,10 @@ public abstract class ComponentImpl {
         }
     }
 
-    private void validateGetFunctionStateParams(String tenant, String 
namespace, String functionName, String key)
+    private void validateGetFunctionStateParams(final String tenant,
+                                                final String namespace,
+                                                final String functionName,
+                                                final String key)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -1355,9 +1424,14 @@ public abstract class ComponentImpl {
         return null;
     }
 
-    private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String componentName,
-                                                        String 
functionDetailsJson, String componentConfigJson, ComponentType componentType,
-                                                        String functionPkgUrl, 
File uploadedInputStreamAsFile) throws IOException {
+    private FunctionDetails validateUpdateRequestParams(final String tenant,
+                                                        final String namespace,
+                                                        final String 
componentName,
+                                                        final String 
functionDetailsJson,
+                                                        final String 
componentConfigJson,
+                                                        final ComponentType 
componentType,
+                                                        final String 
functionPkgUrl,
+                                                        final File 
uploadedInputStreamAsFile) throws IOException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -1370,7 +1444,7 @@ public abstract class ComponentImpl {
 
         if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
             FunctionConfig functionConfig = new 
Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            // The rest end points take precendence over whatever is there in 
functionconfig
+            // The rest end points take precedence over whatever is there in 
functionconfig
             functionConfig.setTenant(tenant);
             functionConfig.setNamespace(namespace);
             functionConfig.setName(componentName);
@@ -1381,7 +1455,7 @@ public abstract class ComponentImpl {
         if (componentType.equals(SOURCE)) {
             Path archivePath = null;
             SourceConfig sourceConfig = new 
Gson().fromJson(componentConfigJson, SourceConfig.class);
-            // The rest end points take precendence over whatever is there in 
sourceconfig
+            // The rest end points take precedence over whatever is there in 
sourceconfig
             sourceConfig.setTenant(tenant);
             sourceConfig.setNamespace(namespace);
             sourceConfig.setName(componentName);
@@ -1403,7 +1477,7 @@ public abstract class ComponentImpl {
         if (componentType.equals(SINK)) {
             Path archivePath = null;
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, 
SinkConfig.class);
-            // The rest end points take precendence over whatever is there in 
sinkConfig
+            // The rest end points take precedence over whatever is there in 
sinkConfig
             sinkConfig.setTenant(tenant);
             sinkConfig.setNamespace(namespace);
             sinkConfig.setName(componentName);
@@ -1562,8 +1636,14 @@ public abstract class ComponentImpl {
         return TypeResolver.resolveRawArgument(funClass, loadedClass);
     }
 
-    private void validateTriggerRequestParams(String tenant, String namespace, 
String functionName, String topic,
-                                              String input, InputStream 
uploadedInputStream) {
+    private void validateTriggerRequestParams(final String tenant,
+                                              final String namespace,
+                                              final String functionName,
+                                              final String topic,
+                                              final String input,
+                                              final InputStream 
uploadedInputStream) {
+        // Note : Checking topic is not required it can be null
+
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -1660,8 +1740,10 @@ public abstract class ComponentImpl {
         }
     }
 
-    protected void componentInstanceStatusRequestValidate (final String 
tenant, final String namespace,
-                                                           final String 
componentName, int instanceId) {
+    protected void componentInstanceStatusRequestValidate (final String tenant,
+                                                           final String 
namespace,
+                                                           final String 
componentName,
+                                                           final int 
instanceId) {
         componentStatusRequestValidate(tenant, namespace, componentName);
 
         FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 9a6f739..f3e41a1 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -24,13 +24,11 @@ import 
org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -155,7 +153,10 @@ public class FunctionsImpl extends ComponentImpl {
         }
 
         @Override
-        public FunctionStatus getStatusExternal(String tenant, String 
namespace, String name, int parallelism) {
+        public FunctionStatus getStatusExternal(final String tenant,
+                                                final String namespace,
+                                                final String name,
+                                                final int parallelism) {
             FunctionStatus functionStatus = new FunctionStatus();
             for (int i = 0; i < parallelism; ++i) {
                 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
@@ -177,7 +178,7 @@ public class FunctionsImpl extends ComponentImpl {
         }
 
         @Override
-        public FunctionStatus emptyStatus(int parallelism) {
+        public FunctionStatus emptyStatus(final int parallelism) {
             FunctionStatus functionStatus = new FunctionStatus();
             functionStatus.setNumInstances(parallelism);
             functionStatus.setNumRunning(0);
@@ -209,8 +210,11 @@ public class FunctionsImpl extends ComponentImpl {
      * @param instanceId the function instance id
      * @return the function status
      */
-    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
getFunctionInstanceStatus(final String tenant, final String namespace, final 
String componentName,
-                                                                               
                       final String instanceId, URI uri) throws IOException {
+    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
getFunctionInstanceStatus(final String tenant,
+                                                                               
                       final String namespace,
+                                                                               
                       final String componentName,
+                                                                               
                       final String instanceId,
+                                                                               
                       final URI uri) {
 
         // validate parameters
         componentInstanceStatusRequestValidate(tenant, namespace, 
componentName, Integer.parseInt(instanceId));
@@ -237,8 +241,10 @@ public class FunctionsImpl extends ComponentImpl {
      * @return a list of function statuses
      * @throws PulsarAdminException
      */
-    public FunctionStatus getFunctionStatus(final String tenant, final String 
namespace, final String componentName,
-                                            URI uri) {
+    public FunctionStatus getFunctionStatus(final String tenant,
+                                            final String namespace,
+                                            final String componentName,
+                                            final URI uri) {
 
         // validate parameters
         componentStatusRequestValidate(tenant, namespace, componentName);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
index b94dfd8..9d7b316 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
@@ -38,7 +37,7 @@ public class FunctionsMetricsResource extends 
FunctionApiResource {
     @Path("metrics")
     @GET
     @Produces(MediaType.TEXT_PLAIN)
-    public Response getMetrics() throws JsonProcessingException {
+    public Response getMetrics() {
 
         WorkerService workerService = get();
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 6d70fbb..862aaf6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -24,13 +24,11 @@ import 
org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -123,13 +121,18 @@ public class SinkImpl extends ComponentImpl {
         }
 
         @Override
-        public SinkStatus getStatus(String tenant, String namespace, String 
name, Collection<Function.Assignment> assignments, URI uri) throws 
PulsarAdminException {
+        public SinkStatus getStatus(final String tenant,
+                                    final String namespace,
+                                    final String name,
+                                    final Collection<Function.Assignment> 
assignments,
+                                    final URI uri) throws PulsarAdminException 
{
             SinkStatus sinkStatus = new SinkStatus();
             for (Function.Assignment assignment : assignments) {
                 boolean isOwner = 
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
                 SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData;
                 if (isOwner) {
-                    sinkInstanceStatusData = 
getComponentInstanceStatus(tenant, namespace, name, 
assignment.getInstance().getInstanceId(), null);
+                    sinkInstanceStatusData = getComponentInstanceStatus(tenant,
+                            namespace, name, 
assignment.getInstance().getInstanceId(), null);
                 } else {
                     sinkInstanceStatusData = 
worker().getFunctionAdmin().sink().getSinkStatus(
                             
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
@@ -154,7 +157,10 @@ public class SinkImpl extends ComponentImpl {
         }
 
         @Override
-        public SinkStatus getStatusExternal (String tenant, String namespace, 
String name, int parallelism) {
+        public SinkStatus getStatusExternal(final String tenant,
+                                            final String namespace,
+                                            final String name,
+                                            final int parallelism) {
             SinkStatus sinkStatus = new SinkStatus();
             for (int i = 0; i < parallelism; ++i) {
                 SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
@@ -176,7 +182,7 @@ public class SinkImpl extends ComponentImpl {
         }
 
         @Override
-        public SinkStatus emptyStatus(int parallelism) {
+        public SinkStatus emptyStatus(final int parallelism) {
             SinkStatus sinkStatus = new SinkStatus();
             sinkStatus.setNumInstances(parallelism);
             sinkStatus.setNumRunning(0);
@@ -200,9 +206,11 @@ public class SinkImpl extends ComponentImpl {
         super(workerServiceSupplier, ComponentType.SINK);
     }
 
-    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
getSinkInstanceStatus(
-            String tenant, String namespace, String sinkName, String 
instanceId, URI uri)
-            throws IOException {
+    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
getSinkInstanceStatus(final String tenant,
+                                                                               
       final String namespace,
+                                                                               
       final String sinkName,
+                                                                               
       final String instanceId,
+                                                                               
       final URI uri) {
 
         // validate parameters
         componentInstanceStatusRequestValidate(tenant, namespace, sinkName, 
Integer.parseInt(instanceId));
@@ -221,9 +229,10 @@ public class SinkImpl extends ComponentImpl {
         return sinkInstanceStatusData;
     }
 
-    public SinkStatus getSinkStatus(
-            final String tenant, final String namespace,
-            final String componentName, URI uri) {
+    public SinkStatus getSinkStatus(final String tenant,
+                                    final String namespace,
+                                    final String componentName,
+                                    final URI uri) {
 
         // validate parameters
         componentStatusRequestValidate(tenant, namespace, componentName);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index 8c3c988..d915bb0 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -24,13 +24,11 @@ import 
org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -122,7 +120,11 @@ public class SourceImpl extends ComponentImpl {
         }
 
         @Override
-        public SourceStatus getStatus(String tenant, String namespace, String 
name, Collection<Function.Assignment> assignments, URI uri) throws 
PulsarAdminException {
+        public SourceStatus getStatus(final String tenant,
+                                      final String namespace,
+                                      final String name,
+                                      final Collection<Function.Assignment> 
assignments,
+                                      final URI uri) throws 
PulsarAdminException {
             SourceStatus sourceStatus = new SourceStatus();
             for (Function.Assignment assignment : assignments) {
                 boolean isOwner = 
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
@@ -153,7 +155,10 @@ public class SourceImpl extends ComponentImpl {
         }
 
         @Override
-        public SourceStatus getStatusExternal(String tenant, String namespace, 
String name, int parallelism) {
+        public SourceStatus getStatusExternal(final String tenant,
+                                              final String namespace,
+                                              final String name,
+                                              final int parallelism) {
             SourceStatus sinkStatus = new SourceStatus();
             for (int i = 0; i < parallelism; ++i) {
                 SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
@@ -175,7 +180,7 @@ public class SourceImpl extends ComponentImpl {
         }
 
         @Override
-        public SourceStatus emptyStatus(int parallelism) {
+        public SourceStatus emptyStatus(final int parallelism) {
             SourceStatus sourceStatus = new SourceStatus();
             sourceStatus.setNumInstances(parallelism);
             sourceStatus.setNumRunning(0);
@@ -199,8 +204,10 @@ public class SourceImpl extends ComponentImpl {
         super(workerServiceSupplier, ComponentType.SOURCE);
     }
 
-    public SourceStatus getSourceStatus(final String tenant, final String 
namespace,
-                                        final String componentName, URI uri) 
throws IOException {
+    public SourceStatus getSourceStatus(final String tenant,
+                                        final String namespace,
+                                        final String componentName,
+                                        final URI uri) {
         // validate parameters
         componentStatusRequestValidate(tenant, namespace, componentName);
 
@@ -217,8 +224,11 @@ public class SourceImpl extends ComponentImpl {
         return sourceStatus;
     }
 
-    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
getSourceInstanceStatus(
-            String tenant, String namespace, String sourceName, String 
instanceId, URI uri) {
+    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
getSourceInstanceStatus(final String tenant,
+                                                                               
               final String namespace,
+                                                                               
               final String sourceName,
+                                                                               
               final String instanceId,
+                                                                               
               final URI uri) {
         // validate parameters
         componentInstanceStatusRequestValidate(tenant, namespace, sourceName, 
Integer.parseInt(instanceId));
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index bafb7a9..56c945d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -27,14 +27,20 @@ import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.worker.*;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.MembershipManager;
+import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerService;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
-import java.io.*;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -118,11 +124,11 @@ public class WorkerImpl {
                 .build();
     }
 
-    public boolean isSuperUser(String clientRole) {
+    public boolean isSuperUser(final String clientRole) {
         return clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
 
-    public List<org.apache.pulsar.common.stats.Metrics> 
getWorkerMetrics(String clientRole) throws IOException {
+    public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(final 
String clientRole) {
         if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
             log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
             throw new 
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
@@ -135,12 +141,12 @@ public class WorkerImpl {
         if (!isWorkerServiceAvailable()) {
             throw new WebApplicationException(
                     
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
+                            .entity(new ErrorData("Function worker service is 
not available")).build());
         }
         return worker().getMetricsGenerator().generate();
     }
 
-    public Response getFunctionsMetrics(String clientRole) throws IOException {
+    public Response getFunctionsMetrics(final String clientRole) {
         if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
             log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
             return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
@@ -156,7 +162,7 @@ public class WorkerImpl {
         public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
metrics;
     }
 
-    private Response getFunctionsMetrics() throws IOException {
+    private Response getFunctionsMetrics() {
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
@@ -171,7 +177,8 @@ public class WorkerImpl {
             String fullyQualifiedInstanceName = entry.getKey();
             FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
 
-            FunctionStats.FunctionInstanceStats functionInstanceStats = 
Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
+            FunctionStats.FunctionInstanceStats functionInstanceStats =
+                    Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, 
functionRuntimeInfo);
 
             WorkerFunctionInstanceStats workerFunctionInstanceStats = new 
WorkerFunctionInstanceStats();
             workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);

Reply via email to