kk-0619 opened a new issue #7036:
URL: https://github.com/apache/pulsar/issues/7036
**Describe the bug**
State storage function not working after sending data into state storage for
about an hour.
**To Reproduce**
Steps to reproduce the behavior:
1. Run the **ProducerTest.java**
`package pulsar;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import pulsar.core.PulsarProducer;
import pulsar.models.Occupancy_Offset;
public class ProducerTest {
public static void main(String[] args) throws Exception {
PulsarProducer<Occupancy_Offset> pulsarProducer = new
PulsarProducer("pulsar://192.168.2.73:6650", "statetest")
.init(JSONSchema.of(Occupancy_Offset.class));
Occupancy_Offset ocoff = new Occupancy_Offset();
Random rand = new Random();
while (true) {
ocoff.dateTime = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").format(new Date());
ocoff.occupancy = rand.nextInt(9) + 1;
ocoff.offset = 1;//rand.nextInt(9) + 1;
System.out.println(ocoff);
pulsarProducer.send(ocoff);
}
}
}`
2. State function
`package pulsar;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import javax.management.loading.MLet;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.LocalRunner;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.json.simple.JSONArray;
import pulsar.models.EventRawData;
import pulsar.models.Occupancy_Offset;
public class StateTestFunction implements Function<Occupancy_Offset, Void> {
@Override
public Void process(Occupancy_Offset occdata, Context context) throws
Exception {
System.out.println("\n\n\n-----------------STATETEST-----------------\n\n\n");
try {
int occupancydata = 0;
int offsetdata = 0;
String laststatetest = "testlaststate";
JSONParser parser = new JSONParser();
try {
ByteBuffer lastOccupancyState =
context.getState(laststatetest);
JSONObject jo = new JSONObject();
if (lastOccupancyState != null) {
jo = (JSONObject) parser.parse(new
String(lastOccupancyState.array(), "UTF-8"));
System.out.println("jo 1 : " + jo.toJSONString());
// get old occupancy
occupancydata =
Integer.valueOf(jo.get("occupancy").toString());
System.out.println("occupancydata : " + occupancydata);
offsetdata =
Integer.valueOf(jo.get("offset").toString());
System.out.println("offsetdata : " + offsetdata);
} else{
System.out.println("[" + laststatetest + "] NOT SET
YET");
}
jo.put("occupancy", occupancydata + occdata.occupancy);
jo.put("offset", offsetdata + occdata.offset);
System.out.println("jo 2 : " + jo.toJSONString());
context.putState(laststatetest,
ByteBuffer.wrap(jo.toString().getBytes(UTF_8)));
} catch (Exception e) {
System.out.println("test Exception: " + e.getMessage());
e.printStackTrace();
}
System.out.println("NEW Occupancy[" + laststatetest + "] : " +
occupancydata + ", " + offsetdata);
Occupancy_Offset total = new Occupancy_Offset();
total.dateTime = occdata.dateTime;
total.occupancy = occupancydata;
total.offset = offsetdata;
context.newOutputMessage(laststatetest,
JSONSchema.of(Occupancy_Offset.class))
.value(total)
.send();
} catch (NullPointerException npe) {
System.out.println("NullPointerException error: " +
npe.getMessage());
} catch (Exception e) {
System.out.println("Exception error: " + e.getMessage());
}
System.out.println("-------------OUT-------------\n");
return null;
}
}`
3. After running for minutes to an hour.
4. Data stop uploading to presto.
**Models**
`import java.io.Serializable;
public class EventRawData implements Serializable{
public String dateTime;
public int in;
public int out;
}`
`package pulsar.models;
import java.io.Serializable;
public class Occupancy_Offset implements Serializable {
public String dateTime;
public int occupancy;
public int offset;
public Occupancy_Offset() {
}
@Override
public String toString() {
return dateTime + "|" + occupancy + "|" + offset;
}
}`
**Expected behavior**
Start to send data to the state storage and upload to presto. Data is able
to upload to presto at first. After some time, it stops and no more data being
uploaded to presto.
**Screenshots**
1. Output after function stopped working

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]