[
https://issues.apache.org/jira/browse/GOBBLIN-1705?focusedWorklogId=807561&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-807561
]
ASF GitHub Bot logged work on GOBBLIN-1705:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 09/Sep/22 19:01
Start Date: 09/Sep/22 19:01
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3557:
URL: https://github.com/apache/gobblin/pull/3557#discussion_r967366821
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.HashSet;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
+
+/**
+ * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent}
schema to process Kafka messages received
+ * from the consumer service. This monitor responds to changes to flow specs
(creations, updates, deletes) and acts as
+ * a connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class SpecStoreChangeMonitor extends HighLevelConsumer {
+ protected HashSet<Long> timestampsSeenBefore;
+
+ @Inject
+ protected FlowCatalog flowCatalog;
+
+ @Inject
+ protected GobblinServiceJobScheduler scheduler;
+
+ @Inject
+ public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
+ super(topic, config, numThreads);
+ this.timestampsSeenBefore = new HashSet();
+ }
+
+ @Override
+ protected void processMessage(DecodeableKafkaRecord message) {
+ String specUri = (String) message.getKey();
+ SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue();
+
+ Long timestamp = value.getTimestamp();
+ String operation = value.getOperationType().name();
+ log.info("Processing message with specUri is {} timestamp is {} operation
is {}", specUri, timestamp, operation);
+
+ // If we've already processed a message with this timestamp before then
skip duplicate message
+ if (timestampsSeenBefore.contains(timestamp)) {
Review Comment:
If you want to filter out duplicate, I think timestamp + specUri should be
the key. only timestamp seems risky.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java:
##########
@@ -119,6 +122,8 @@ public GobblinServiceConfiguration(String serviceName,
String serviceId, Config
ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
this.isTopologySpecFactoryEnabled =
ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY, true);
+ this.isSpecStoreChangeMonitorEnabled =
+ ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_SPEC_STORE_CHANGE_MONITOR_ENABLED_KEY, true);
Review Comment:
by default, this should be false?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -457,4 +458,15 @@ public void remove(URI uri, Properties headers, boolean
triggerListener) {
public Object getSyncObject(String specUri) {
return this.specSyncObjects.getOrDefault(specUri, null);
}
+
+ public Spec getSpecFromStore(String specUri) {
+ try {
+ URI uri = new URI(specUri);
+ return specStore.getSpec(uri);
+ } catch (SpecNotFoundException e) {
+ throw new RuntimeException("Could not find Spec from Spec Store for URI:
" + specUri, e);
Review Comment:
In this case, I don't think we want to throw exception. If it's delete
operation, the spec is not in catalog.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.HashSet;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
+
+/**
+ * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent}
schema to process Kafka messages received
+ * from the consumer service. This monitor responds to changes to flow specs
(creations, updates, deletes) and acts as
+ * a connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class SpecStoreChangeMonitor extends HighLevelConsumer {
+ protected HashSet<Long> timestampsSeenBefore;
+
+ @Inject
+ protected FlowCatalog flowCatalog;
+
+ @Inject
+ protected GobblinServiceJobScheduler scheduler;
+
+ @Inject
+ public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
+ super(topic, config, numThreads);
+ this.timestampsSeenBefore = new HashSet();
+ }
+
+ @Override
+ protected void processMessage(DecodeableKafkaRecord message) {
+ String specUri = (String) message.getKey();
+ SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue();
+
+ Long timestamp = value.getTimestamp();
+ String operation = value.getOperationType().name();
+ log.info("Processing message with specUri is {} timestamp is {} operation
is {}", specUri, timestamp, operation);
+
+ // If we've already processed a message with this timestamp before then
skip duplicate message
+ if (timestampsSeenBefore.contains(timestamp)) {
+ return;
+ }
+
+ Spec spec = this.flowCatalog.getSpecFromStore(specUri);
Review Comment:
What happens if it's delete? you will get null or event exception in this
case
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.HashSet;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
+
+/**
+ * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent}
schema to process Kafka messages received
+ * from the consumer service. This monitor responds to changes to flow specs
(creations, updates, deletes) and acts as
+ * a connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class SpecStoreChangeMonitor extends HighLevelConsumer {
+ protected HashSet<Long> timestampsSeenBefore;
+
+ @Inject
+ protected FlowCatalog flowCatalog;
+
+ @Inject
+ protected GobblinServiceJobScheduler scheduler;
+
+ @Inject
+ public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
+ super(topic, config, numThreads);
+ this.timestampsSeenBefore = new HashSet();
+ }
+
+ @Override
+ protected void processMessage(DecodeableKafkaRecord message) {
+ String specUri = (String) message.getKey();
+ SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue();
+
+ Long timestamp = value.getTimestamp();
+ String operation = value.getOperationType().name();
+ log.info("Processing message with specUri is {} timestamp is {} operation
is {}", specUri, timestamp, operation);
+
+ // If we've already processed a message with this timestamp before then
skip duplicate message
+ if (timestampsSeenBefore.contains(timestamp)) {
+ return;
+ }
+
+ Spec spec = this.flowCatalog.getSpecFromStore(specUri);
+
+ // Call respective action for the type of change received
+ if (operation == "CREATE") {
+ scheduler.onAddSpec(spec);
Review Comment:
What will happen if we fail to schedule it? Seems we silent fail it and
skip, I will suggest to emit some metrics here to indicate something went wrong
so that we can monitor the behavior.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.HashSet;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
+
+/**
+ * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent}
schema to process Kafka messages received
+ * from the consumer service. This monitor responds to changes to flow specs
(creations, updates, deletes) and acts as
+ * a connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class SpecStoreChangeMonitor extends HighLevelConsumer {
+ protected HashSet<Long> timestampsSeenBefore;
+
+ @Inject
+ protected FlowCatalog flowCatalog;
+
+ @Inject
+ protected GobblinServiceJobScheduler scheduler;
+
+ @Inject
+ public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
+ super(topic, config, numThreads);
+ this.timestampsSeenBefore = new HashSet();
+ }
+
+ @Override
+ protected void processMessage(DecodeableKafkaRecord message) {
+ String specUri = (String) message.getKey();
+ SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue();
+
+ Long timestamp = value.getTimestamp();
+ String operation = value.getOperationType().name();
+ log.info("Processing message with specUri is {} timestamp is {} operation
is {}", specUri, timestamp, operation);
+
+ // If we've already processed a message with this timestamp before then
skip duplicate message
+ if (timestampsSeenBefore.contains(timestamp)) {
+ return;
+ }
+
+ Spec spec = this.flowCatalog.getSpecFromStore(specUri);
+
+ // Call respective action for the type of change received
+ if (operation == "CREATE") {
+ scheduler.onAddSpec(spec);
+ } else if (operation == "INSERT") {
+ scheduler.onUpdateSpec(spec);
+ } else if (operation == "DELETE") {
+ scheduler.onDeleteSpec(spec.getUri(), spec.getVersion());
Review Comment:
Same question as above, when it's delete operation, you will not have spec
info in spec store
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -457,4 +458,15 @@ public void remove(URI uri, Properties headers, boolean
triggerListener) {
public Object getSyncObject(String specUri) {
return this.specSyncObjects.getOrDefault(specUri, null);
}
+
+ public Spec getSpecFromStore(String specUri) {
Review Comment:
we already have this method getSpecWrapper, why do we create a new one?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.HashSet;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
+
+/**
+ * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent}
schema to process Kafka messages received
+ * from the consumer service. This monitor responds to changes to flow specs
(creations, updates, deletes) and acts as
+ * a connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class SpecStoreChangeMonitor extends HighLevelConsumer {
+ protected HashSet<Long> timestampsSeenBefore;
+
+ @Inject
+ protected FlowCatalog flowCatalog;
+
+ @Inject
+ protected GobblinServiceJobScheduler scheduler;
+
+ @Inject
+ public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
+ super(topic, config, numThreads);
+ this.timestampsSeenBefore = new HashSet();
+ }
+
+ @Override
+ protected void processMessage(DecodeableKafkaRecord message) {
+ String specUri = (String) message.getKey();
+ SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue();
+
+ Long timestamp = value.getTimestamp();
+ String operation = value.getOperationType().name();
+ log.info("Processing message with specUri is {} timestamp is {} operation
is {}", specUri, timestamp, operation);
+
+ // If we've already processed a message with this timestamp before then
skip duplicate message
+ if (timestampsSeenBefore.contains(timestamp)) {
+ return;
+ }
+
+ Spec spec = this.flowCatalog.getSpecFromStore(specUri);
+
+ // Call respective action for the type of change received
+ if (operation == "CREATE") {
+ scheduler.onAddSpec(spec);
+ } else if (operation == "INSERT") {
+ scheduler.onUpdateSpec(spec);
+ } else if (operation == "DELETE") {
+ scheduler.onDeleteSpec(spec.getUri(), spec.getVersion());
+ } else {
+ log.warn("Received unsupported change type of operation {}. Expected
values to be in [CREATE, INSERT, DELETE]", operation);
+ return;
+ }
+
+ timestampsSeenBefore.add(timestamp);
Review Comment:
How this process message method been called? in single thread or it can be
multi thread? If it's multi thread, it's still possible that you process same
message at the same time? Do we have any assumption to avoid this to happen? If
so, put them in the comment
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.HashSet;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
+
+/**
+ * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent}
schema to process Kafka messages received
+ * from the consumer service. This monitor responds to changes to flow specs
(creations, updates, deletes) and acts as
+ * a connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class SpecStoreChangeMonitor extends HighLevelConsumer {
+ protected HashSet<Long> timestampsSeenBefore;
+
+ @Inject
+ protected FlowCatalog flowCatalog;
+
+ @Inject
+ protected GobblinServiceJobScheduler scheduler;
+
+ @Inject
+ public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
+ super(topic, config, numThreads);
+ this.timestampsSeenBefore = new HashSet();
Review Comment:
I'll suggest to use cache in this case, The size of HashSet will keep
increasing while the service is running, which might lead to OOM issue finally.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.HashSet;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
+
+/**
+ * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent}
schema to process Kafka messages received
+ * from the consumer service. This monitor responds to changes to flow specs
(creations, updates, deletes) and acts as
+ * a connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class SpecStoreChangeMonitor extends HighLevelConsumer {
+ protected HashSet<Long> timestampsSeenBefore;
+
+ @Inject
+ protected FlowCatalog flowCatalog;
+
+ @Inject
+ protected GobblinServiceJobScheduler scheduler;
+
+ @Inject
+ public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
+ super(topic, config, numThreads);
+ this.timestampsSeenBefore = new HashSet();
+ }
+
+ @Override
+ protected void processMessage(DecodeableKafkaRecord message) {
+ String specUri = (String) message.getKey();
+ SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue();
+
+ Long timestamp = value.getTimestamp();
+ String operation = value.getOperationType().name();
+ log.info("Processing message with specUri is {} timestamp is {} operation
is {}", specUri, timestamp, operation);
+
+ // If we've already processed a message with this timestamp before then
skip duplicate message
+ if (timestampsSeenBefore.contains(timestamp)) {
+ return;
+ }
+
+ Spec spec = this.flowCatalog.getSpecFromStore(specUri);
+
+ // Call respective action for the type of change received
+ if (operation == "CREATE") {
Review Comment:
I see in schema you define those as "insert" "update" "delete", seems not
correct here?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java:
##########
@@ -70,6 +70,9 @@ public class GobblinServiceConfiguration {
@Getter
private final boolean isHelixManagerEnabled;
+ @Getter
+ private final boolean isSpecStoreChangeMonitorEnabled;
Review Comment:
Can we leverage the warmStandbyEnabled key? We need this feature only when
we enable the warm standby feature where we use CDC stream to do message
forwarding?
Issue Time Tracking
-------------------
Worklog Id: (was: 807561)
Time Spent: 0.5h (was: 20m)
> New Consumer service that processes changes to Flow Spec Store
> --------------------------------------------------------------
>
> Key: GOBBLIN-1705
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1705
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> In the multi-leader version of GaaS, the REST API layer will not directly
> contact the `GobblinServiceJobScheduler` to respond to API requests. Instead
> after flow level updates are persisted to MySQL, this new monitor will
> subscribe to Kafka events informing it of Flow Spec changes corresponding to
> the API requests and trigger their execution. There will be a similar change
> to follow to respond to other API requests.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)