sijie closed pull request #2370: add more debug logs with function name
reference
URL: https://github.com/apache/incubator-pulsar/pull/2370
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 6abb5fbfc0..aad7d4a807 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -33,6 +33,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.utils.Utils;
import static
org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime.PYTHON;
@@ -63,8 +64,9 @@ public RuntimeSpawner(InstanceConfig instanceConfig,
}
public void start() throws Exception {
- log.info("RuntimeSpawner starting function {} - {}",
this.instanceConfig.getFunctionDetails().getName(),
- this.instanceConfig.getInstanceId());
+ FunctionDetails details = this.instanceConfig.getFunctionDetails();
+ log.info("{}/{}/{}-{} RuntimeSpawner starting function",
details.getTenant(), details.getNamespace(),
+ details.getName(), this.instanceConfig.getInstanceId());
if (instanceConfig.getFunctionDetails().getRuntime() == PYTHON
&& instanceConfig.getFunctionDetails().getSource() != null
@@ -82,10 +84,8 @@ public void start() throws Exception {
@Override
public void run() {
if (!runtime.isAlive()) {
- log.error("[{}-{}] Function Container is dead with
exception",
- instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId(),
- runtime.getDeathException());
- log.error("Restarting...");
+ log.error("{}/{}/{}-{} Function Container is dead with
exception.. restarting", details.getTenant(),
+ details.getNamespace(), details.getName(),
runtime.getDeathException());
// Just for the sake of sanity, just destroy the
runtime
runtime.stop();
runtimeDeathException = runtime.getDeathException();
@@ -119,7 +119,8 @@ public void join() throws Exception {
try {
return Utils.printJson(msg);
} catch (IOException e) {
- throw new RuntimeException("Exception parsing getstatus", e);
+ throw new RuntimeException(
+ instanceConfig.getFunctionDetails().getName() + "
Exception parsing getstatus", e);
}
});
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index b3f30fdb95..f5a7e969be 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -56,6 +56,7 @@
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Function.Instance;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
@@ -99,7 +100,10 @@ public FunctionActioner(WorkerConfig workerConfig,
try {
startFunction(action.getFunctionRuntimeInfo());
} catch (Exception ex) {
- log.info("Error starting function", ex);
+ FunctionDetails details =
action.getFunctionRuntimeInfo().getFunctionInstance()
+
.getFunctionMetaData().getFunctionDetails();
+ log.info("{}/{}/{} Error starting function",
details.getTenant(), details.getNamespace(),
+ details.getName(), ex);
action.getFunctionRuntimeInfo().setStartupException(ex);
}
} else {
@@ -132,7 +136,8 @@ public void startFunction(FunctionRuntimeInfo
functionRuntimeInfo) throws Except
int instanceId =
functionRuntimeInfo.getFunctionInstance().getInstanceId();
FunctionDetails.Builder functionDetails =
FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
- log.info("Starting function {} - {} ...", functionDetails.getName(),
instanceId);
+ log.info("{}/{}/{}-{} Starting function ...",
functionDetails.getTenant(), functionDetails.getNamespace(),
+ functionDetails.getName(), instanceId);
File pkgFile = null;
String pkgLocation =
functionMetaData.getPackageLocation().getPackagePath();
@@ -164,7 +169,8 @@ public void startFunction(FunctionRuntimeInfo
functionRuntimeInfo) throws Except
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
- log.info("start process with instance config {}", instanceConfig);
+ log.info("{}/{}/{}-{} start process with instance config {}",
functionDetails.getTenant(), functionDetails.getNamespace(),
+ functionDetails.getName(), instanceId, instanceConfig);
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig,
pkgFile.getAbsolutePath(),
runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
@@ -175,6 +181,7 @@ public void startFunction(FunctionRuntimeInfo
functionRuntimeInfo) throws Except
private void downloadFile(File pkgFile, boolean isPkgUrlProvided,
FunctionMetaData functionMetaData, int instanceId) throws
FileNotFoundException, IOException {
+ FunctionDetails details = functionMetaData.getFunctionDetails();
File pkgDir = pkgFile.getParentFile();
if (pkgFile.exists()) {
@@ -194,7 +201,8 @@ private void downloadFile(File pkgFile, boolean
isPkgUrlProvided, FunctionMetaDa
}
String pkgLocationPath =
functionMetaData.getPackageLocation().getPackagePath();
boolean downloadFromHttp = isPkgUrlProvided &&
pkgLocationPath.startsWith(HTTP);
- log.info("Function package file {} will be downloaded from {}",
tempPkgFile,
+ log.info("{}/{}/{} Function package file {} will be downloaded from
{}", tempPkgFile, details.getTenant(),
+ details.getNamespace(), details.getName(),
downloadFromHttp ? pkgLocationPath :
functionMetaData.getPackageLocation());
if(downloadFromHttp) {
@@ -228,8 +236,9 @@ private void downloadFile(File pkgFile, boolean
isPkgUrlProvided, FunctionMetaDa
public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
- log.info("Stopping function {} - {}...",
- functionMetaData.getFunctionDetails().getName(),
instance.getInstanceId());
+ FunctionDetails details = functionMetaData.getFunctionDetails();
+ log.info("{}/{}/{}-{} Stopping function...", details.getTenant(),
details.getNamespace(), details.getName(),
+ instance.getInstanceId());
if (functionRuntimeInfo.getRuntimeSpawner() != null) {
functionRuntimeInfo.getRuntimeSpawner().close();
functionRuntimeInfo.setRuntimeSpawner(null);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 1c045d48c1..366eaba6a0 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -78,7 +78,8 @@ public void accept(Message<byte[]> msg) {
try {
assignmentsUpdate =
Request.AssignmentsUpdate.parseFrom(msg.getData());
} catch (IOException e) {
- log.error("Received bad assignment update at message {}",
msg.getMessageId(), e);
+ log.error("[{}] Received bad assignment update at message {}",
reader.getTopic(), msg.getMessageId(),
+ e);
// TODO: find a better way to handle bad request
throw new RuntimeException(e);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 29a322043a..9de92269e4 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -290,6 +290,10 @@ synchronized void
proccessDeregister(Request.ServiceRequest deregisterRequest) {
completeRequest(deregisterRequest, true);
needsScheduling = true;
} else {
+ if (log.isDebugEnabled()) {
+ log.debug("{}/{}/{} Ignoring outdated request version:
{}", tenant, namespace, functionName,
+
deregisterRequest.getFunctionMetaData().getVersion());
+ }
completeRequest(deregisterRequest, false,
"Request ignored because it is out of date. Please try
again.");
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services