This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7474ad04b62 NIFI-15145 Add RecordLookup, KeyValueLookup, and 
MapCacheClient Services for Couchbase (#10467)
7474ad04b62 is described below

commit 7474ad04b62b3f0674f8bacaa2828ade71b66a00
Author: Mark Bathori <[email protected]>
AuthorDate: Wed Mar 4 21:02:28 2026 +0100

    NIFI-15145 Add RecordLookup, KeyValueLookup, and MapCacheClient Services 
for Couchbase (#10467)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi-couchbase-nar/pom.xml                     |   5 +
 .../couchbase/AbstractCouchbaseProcessor.java      |  22 +--
 .../nifi/processors/couchbase/PutCouchbase.java    |   2 -
 .../couchbase/AbstractCouchbaseProcessorTest.java  |  41 ++++
 .../processors/couchbase/GetCouchbaseTest.java     |  71 +++----
 .../processors/couchbase/PutCouchbaseTest.java     |  48 ++---
 .../nifi/services/couchbase/CouchbaseClient.java   |  11 ++
 ...ion.java => CouchbaseCasMismatchException.java} |  10 +-
 ...ption.java => CouchbaseDocExistsException.java} |  10 +-
 ...ion.java => CouchbaseDocNotFoundException.java} |  10 +-
 .../couchbase/exception/CouchbaseException.java    |   2 +-
 .../CouchbaseLookupInResult.java}                  |  12 +-
 .../pom.xml                                        |  43 ++++-
 .../couchbase/AbstractCouchbaseService.java        |  93 +++++++++
 .../couchbase/CouchbaseKeyValueLookupService.java  |  85 +++++++++
 .../couchbase/CouchbaseMapCacheClient.java         | 209 +++++++++++++++++++++
 .../couchbase/CouchbaseRecordLookupService.java    | 103 ++++++++++
 .../org.apache.nifi.controller.ControllerService   |  17 ++
 .../couchbase/AbstractCouchbaseServiceTest.java}   |  23 ++-
 .../CouchbaseKeyValueLookupServiceTest.java        |  98 ++++++++++
 .../couchbase/CouchbaseMapCacheClientTest.java     | 116 ++++++++++++
 .../CouchbaseRecordLookupServiceTest.java          | 112 +++++++++++
 .../couchbase/StandardCouchbaseClient.java         | 139 ++++++++++++--
 .../StandardCouchbaseConnectionService.java        |  10 +-
 ...uchbaseClient.java => CouchbaseClientTest.java} |  84 ++++++++-
 .../nifi-couchbase-bundle/pom.xml                  |   1 +
 26 files changed, 1220 insertions(+), 157 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
index 31482f7dadb..08cef39e101 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
@@ -32,6 +32,11 @@
             <artifactId>nifi-couchbase-processors</artifactId>
             <version>2.9.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-couchbase-services</artifactId>
+            <version>2.9.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-couchbase-services-api-nar</artifactId>
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
index 6d84e4e953e..ec9361e6d7a 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
@@ -50,11 +50,9 @@ import static 
org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.SCO
 
 public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
 
-    protected CouchbaseConnectionService connectionService;
-
     public static final PropertyDescriptor DOCUMENT_ID = new 
PropertyDescriptor.Builder()
             .name("Document ID")
-            .description("Couchbase document identifier, or an expression to 
construct the Couchbase document identifier.")
+            .description("Couchbase document identifier, or an expression to 
construct the Couchbase document identifier")
             .required(true)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -62,14 +60,14 @@ public abstract class AbstractCouchbaseProcessor extends 
AbstractProcessor {
 
     public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
             .name("Couchbase Connection Service")
-            .description("A Couchbase Connection Service which manages 
connections to a Couchbase cluster.")
+            .description("Service responsible for managing connections to 
Couchbase cluster")
             .required(true)
             .identifiesControllerService(CouchbaseConnectionService.class)
             .build();
 
     public static final PropertyDescriptor BUCKET_NAME = new 
PropertyDescriptor.Builder()
             .name("Bucket Name")
-            .description("The name of the bucket where documents will be 
stored. Each bucket contains a hierarchy of scopes and collections to group 
keys and values logically.")
+            .description("The name of the bucket where documents will be 
stored. Each bucket contains a hierarchy of scopes and collections to group 
keys and values logically")
             .required(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .defaultValue(DEFAULT_BUCKET)
@@ -78,7 +76,7 @@ public abstract class AbstractCouchbaseProcessor extends 
AbstractProcessor {
 
     public static final PropertyDescriptor SCOPE_NAME = new 
PropertyDescriptor.Builder()
             .name("Scope Name")
-            .description("The name of the scope  which is a logical namespace 
within a bucket, serving to categorize and organize related collections.")
+            .description("The name of the scope which is a logical namespace 
within a bucket, serving to categorize and organize related collections")
             .required(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .defaultValue(DEFAULT_SCOPE)
@@ -87,7 +85,7 @@ public abstract class AbstractCouchbaseProcessor extends 
AbstractProcessor {
 
     public static final PropertyDescriptor COLLECTION_NAME = new 
PropertyDescriptor.Builder()
             .name("Collection Name")
-            .description("The name of collection which is a logical container 
within a scope, used to hold documents.")
+            .description("The name of collection which is a logical container 
within a scope, used to hold documents")
             .required(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .defaultValue(DEFAULT_COLLECTION)
@@ -96,7 +94,7 @@ public abstract class AbstractCouchbaseProcessor extends 
AbstractProcessor {
 
     public static final PropertyDescriptor DOCUMENT_TYPE = new 
PropertyDescriptor.Builder()
             .name("Document Type")
-            .description("The content type for storing the document.")
+            .description("The content type for storing the document")
             .required(true)
             .allowableValues(DocumentType.values())
             .defaultValue(DocumentType.JSON.toString())
@@ -104,17 +102,17 @@ public abstract class AbstractCouchbaseProcessor extends 
AbstractProcessor {
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
-            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful")
             .build();
 
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
-            .description("A FlowFile is routed to this relationship if the 
operation failed and retrying the operation will also fail, such as an invalid 
data or schema.")
+            .description("A FlowFile is routed to this relationship if the 
operation failed and retrying the operation will also fail, such as an invalid 
data or schema")
             .build();
 
     public static final Relationship REL_RETRY = new Relationship.Builder()
             .name("retry")
-            .description("All FlowFile that fail due to server/cluster 
availability go to this relationship.")
+            .description("All FlowFile that fail due to server/cluster 
availability go to this relationship")
             .build();
 
     private static final List<PropertyDescriptor> PROPERTIES = List.of(
@@ -128,6 +126,8 @@ public abstract class AbstractCouchbaseProcessor extends 
AbstractProcessor {
 
     private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, 
REL_FAILURE, REL_RETRY);
 
+    protected volatile CouchbaseConnectionService connectionService;
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTIES;
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
index a2bc22816dd..1ffe1045f1c 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
@@ -28,7 +28,6 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.services.couchbase.CouchbaseClient;
-import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
 import org.apache.nifi.services.couchbase.exception.CouchbaseException;
 import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
 import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
@@ -69,7 +68,6 @@ public class PutCouchbase extends AbstractCouchbaseProcessor {
         }
 
         final long startNanos = System.nanoTime();
-        final CouchbaseConnectionService connectionService = 
context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class);
         final String documentId = 
context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
 
         final CouchbaseContext couchbaseContext = getCouchbaseContext(context, 
flowFile);
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java
new file mode 100644
index 00000000000..5cbf0243921
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import org.apache.nifi.services.couchbase.CouchbaseClient;
+import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractCouchbaseProcessorTest {
+
+    protected static final String SERVICE_ID = "couchbaseConnectionService";
+    protected static final String TEST_DOCUMENT_ID = "test-document-id";
+    protected static final String TEST_DOCUMENT_CONTENT = 
"{\"key\":\"value\"}";
+    protected static final String TEST_SERVICE_LOCATION = 
"couchbase://test-location";
+    protected static final long TEST_CAS = 1L;
+
+    protected static CouchbaseConnectionService 
mockConnectionService(CouchbaseClient client) {
+        final CouchbaseConnectionService connectionService = 
mock(CouchbaseConnectionService.class);
+        when(connectionService.getIdentifier()).thenReturn(SERVICE_ID);
+        when(connectionService.getClient(any())).thenReturn(client);
+        
when(connectionService.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+        return connectionService;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
index 97595b7d1d4..4f535e34f9f 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
@@ -31,8 +31,8 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -60,12 +60,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class GetCouchbaseTest {
-
-    private static final String SERVICE_ID = "couchbaseConnectionService";
-    private static final String TEST_DOCUMENT_ID = "test-document-id";
-    private static final String TEST_SERVICE_LOCATION = 
"couchbase://test-location";
-    private static final long TEST_CAS = 1L;
+public class GetCouchbaseTest extends AbstractCouchbaseProcessorTest {
 
     private TestRunner runner;
 
@@ -76,20 +71,15 @@ public class GetCouchbaseTest {
 
     @Test
     public void testOnTriggerWithProvidedDocumentId() throws 
CouchbaseException, InitializationException {
-        final String content = "{\"key\":\"value\"}";
-
-        final CouchbaseGetResult result = new 
CouchbaseGetResult(content.getBytes(), TEST_CAS);
+        final CouchbaseGetResult result = new 
CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
 
         final CouchbaseClient client = mock(CouchbaseClient.class);
         when(client.getDocument(anyString())).thenReturn(result);
 
-        final CouchbaseConnectionService service = 
mock(CouchbaseConnectionService.class);
-        when(service.getIdentifier()).thenReturn(SERVICE_ID);
-        when(service.getClient(any())).thenReturn(client);
-        when(service.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
 
-        runner.addControllerService(SERVICE_ID, service);
-        runner.enableControllerService(service);
+        runner.addControllerService(SERVICE_ID, connectionService);
+        runner.enableControllerService(connectionService);
         runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
         runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
         runner.enqueue(new byte[0]);
@@ -101,7 +91,7 @@ public class GetCouchbaseTest {
         runner.assertTransferCount(REL_FAILURE, 0);
 
         final MockFlowFile outFile = 
runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
-        outFile.assertContentEquals(content);
+        outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
         outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, DEFAULT_BUCKET);
         outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, DEFAULT_SCOPE);
         outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, 
DEFAULT_COLLECTION);
@@ -116,24 +106,18 @@ public class GetCouchbaseTest {
 
     @Test
     public void testWithDocumentIdAsFlowFileAttribute() throws 
CouchbaseException, InitializationException {
-        final String content = "{\"key\":\"value\"}";
-
-        final CouchbaseGetResult result = new 
CouchbaseGetResult(content.getBytes(), TEST_CAS);
+        final CouchbaseGetResult result = new 
CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
 
         final CouchbaseClient client = mock(CouchbaseClient.class);
         when(client.getDocument(anyString())).thenReturn(result);
 
-        final CouchbaseConnectionService service = 
mock(CouchbaseConnectionService.class);
-        when(service.getIdentifier()).thenReturn(SERVICE_ID);
-        when(service.getClient(any())).thenReturn(client);
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
 
         final MockFlowFile flowFile = new MockFlowFile(0);
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("flowfile_document_id", TEST_DOCUMENT_ID);
-        flowFile.putAttributes(attributes);
+        
flowFile.putAttributes(Collections.singletonMap("flowfile_document_id", 
TEST_DOCUMENT_ID));
 
-        runner.addControllerService(SERVICE_ID, service);
-        runner.enableControllerService(service);
+        runner.addControllerService(SERVICE_ID, connectionService);
+        runner.enableControllerService(connectionService);
         runner.setProperty(DOCUMENT_ID, "${flowfile_document_id}");
         runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
         runner.enqueue(flowFile);
@@ -145,7 +129,7 @@ public class GetCouchbaseTest {
         runner.assertTransferCount(REL_FAILURE, 0);
 
         final MockFlowFile outFile = 
runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
-        outFile.assertContentEquals(content);
+        outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
         outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, DEFAULT_BUCKET);
         outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, DEFAULT_SCOPE);
         outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, 
DEFAULT_COLLECTION);
@@ -154,24 +138,20 @@ public class GetCouchbaseTest {
 
     @Test
     public void testWithFlowFileAttributes() throws CouchbaseException, 
InitializationException {
-        final String documentId = "test-document-id";
-        final String content = "{\"key\":\"value\"}";
         final String testBucket = "test-bucket";
         final String testScope = "test-scope";
         final String testCollection = "test-collection";
 
-        final CouchbaseGetResult result = new 
CouchbaseGetResult(content.getBytes(), TEST_CAS);
+        final CouchbaseGetResult result = new 
CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
 
         final CouchbaseClient client = mock(CouchbaseClient.class);
         when(client.getDocument(anyString())).thenReturn(result);
 
-        final CouchbaseConnectionService service = 
mock(CouchbaseConnectionService.class);
-        when(service.getIdentifier()).thenReturn(SERVICE_ID);
-        when(service.getClient(any())).thenReturn(client);
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
 
-        runner.addControllerService(SERVICE_ID, service);
-        runner.enableControllerService(service);
-        runner.setProperty(DOCUMENT_ID, documentId);
+        runner.addControllerService(SERVICE_ID, connectionService);
+        runner.enableControllerService(connectionService);
+        runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
         runner.setProperty(BUCKET_NAME, "${bucket.attribute}");
         runner.setProperty(SCOPE_NAME, "${scope.attribute}");
         runner.setProperty(COLLECTION_NAME, "${collection.attribute}");
@@ -181,17 +161,16 @@ public class GetCouchbaseTest {
         attributes.put("bucket.attribute", testBucket);
         attributes.put("scope.attribute", testScope);
         attributes.put("collection.attribute", testCollection);
-        final byte[] input = documentId.getBytes(StandardCharsets.UTF_8);
-        runner.enqueue(input, attributes);
+        runner.enqueue(new byte[0], attributes);
         runner.run();
 
-        verify(client, times(1)).getDocument(eq(documentId));
+        verify(client, times(1)).getDocument(eq(TEST_DOCUMENT_ID));
 
         runner.assertTransferCount(REL_SUCCESS, 1);
         runner.assertTransferCount(REL_FAILURE, 0);
 
         final MockFlowFile outFile = 
runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
-        outFile.assertContentEquals(content);
+        outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
         outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, testBucket);
         outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, testScope);
         outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, testCollection);
@@ -204,13 +183,11 @@ public class GetCouchbaseTest {
         
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
         when(client.getDocument(anyString())).thenThrow(new 
CouchbaseException("", new TestCouchbaseException("Test exception")));
 
-        final CouchbaseConnectionService service = 
mock(CouchbaseConnectionService.class);
-        when(service.getIdentifier()).thenReturn(SERVICE_ID);
-        when(service.getClient(any())).thenReturn(client);
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
 
         runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
-        runner.addControllerService(SERVICE_ID, service);
-        runner.enableControllerService(service);
+        runner.addControllerService(SERVICE_ID, connectionService);
+        runner.enableControllerService(connectionService);
         runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
         runner.enqueue(new byte[0]);
         runner.run();
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
index 60cd3a114a2..320beb848eb 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
@@ -57,12 +57,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class PutCouchbaseTest {
-
-    private static final String SERVICE_ID = "couchbaseConnectionService";
-    private static final String TEST_DOCUMENT_ID = "test-document-id";
-    private static final String TEST_SERVICE_LOCATION = 
"couchbase://test-location";
-    private static final long TEST_CAS = 1L;
+public class PutCouchbaseTest extends AbstractCouchbaseProcessorTest {
 
     private TestRunner runner;
 
@@ -76,18 +71,15 @@ public class PutCouchbaseTest {
         final CouchbaseClient client = mock(CouchbaseClient.class);
         when(client.upsertDocument(anyString(), any())).thenReturn(new 
CouchbaseUpsertResult(TEST_CAS));
 
-        final CouchbaseConnectionService service = 
mock(CouchbaseConnectionService.class);
-        when(service.getIdentifier()).thenReturn(SERVICE_ID);
-        when(service.getClient(any())).thenReturn(client);
-        when(service.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
 
         final MockFlowFile flowFile = new MockFlowFile(0);
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("flowfile_document_id", TEST_DOCUMENT_ID);
         flowFile.putAttributes(attributes);
 
-        runner.addControllerService(SERVICE_ID, service);
-        runner.enableControllerService(service);
+        runner.addControllerService(SERVICE_ID, connectionService);
+        runner.enableControllerService(connectionService);
         runner.setProperty(DOCUMENT_ID, "${flowfile_document_id}");
         runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
         runner.setValidateExpressionUsage(false);
@@ -120,12 +112,10 @@ public class PutCouchbaseTest {
         
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
         when(client.upsertDocument(anyString(), any())).thenThrow(new 
CouchbaseException("", new TestCouchbaseException("Test exception")));
 
-        final CouchbaseConnectionService service = 
mock(CouchbaseConnectionService.class);
-        when(service.getIdentifier()).thenReturn(SERVICE_ID);
-        when(service.getClient(any())).thenReturn(client);
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
 
-        runner.addControllerService(SERVICE_ID, service);
-        runner.enableControllerService(service);
+        runner.addControllerService(SERVICE_ID, connectionService);
+        runner.enableControllerService(connectionService);
         runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
         runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
         runner.enqueue(new byte[0]);
@@ -143,12 +133,10 @@ public class PutCouchbaseTest {
         
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.RETRY);
         when(client.upsertDocument(anyString(), any())).thenThrow(new 
CouchbaseException("", new TestCouchbaseException("Test exception")));
 
-        final CouchbaseConnectionService service = 
mock(CouchbaseConnectionService.class);
-        when(service.getIdentifier()).thenReturn(SERVICE_ID);
-        when(service.getClient(any())).thenReturn(client);
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
 
-        runner.addControllerService(SERVICE_ID, service);
-        runner.enableControllerService(service);
+        runner.addControllerService(SERVICE_ID, connectionService);
+        runner.enableControllerService(connectionService);
         runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
         runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
         runner.enqueue(new byte[0]);
@@ -166,12 +154,10 @@ public class PutCouchbaseTest {
         
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.ROLLBACK);
         when(client.upsertDocument(anyString(), any())).thenThrow(new 
CouchbaseException("", new TestCouchbaseException("Test exception")));
 
-        final CouchbaseConnectionService service = 
mock(CouchbaseConnectionService.class);
-        when(service.getIdentifier()).thenReturn(SERVICE_ID);
-        when(service.getClient(any())).thenReturn(client);
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
 
-        runner.addControllerService(SERVICE_ID, service);
-        runner.enableControllerService(service);
+        runner.addControllerService(SERVICE_ID, connectionService);
+        runner.enableControllerService(connectionService);
         runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
         runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
         runner.enqueue(new byte[0]);
@@ -189,12 +175,10 @@ public class PutCouchbaseTest {
         
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
         when(client.upsertDocument(anyString(), any())).thenThrow(new 
CouchbaseException("", new TestCouchbaseException("Test exception")));
 
-        final CouchbaseConnectionService service = 
mock(CouchbaseConnectionService.class);
-        when(service.getIdentifier()).thenReturn(SERVICE_ID);
-        when(service.getClient(any())).thenReturn(client);
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
 
-        runner.addControllerService(SERVICE_ID, service);
-        runner.enableControllerService(service);
+        runner.addControllerService(SERVICE_ID, connectionService);
+        runner.enableControllerService(connectionService);
         runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
         runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
         runner.enqueue(new byte[0]);
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
index 9ad04fbe01d..96797c1a5d2 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
@@ -19,6 +19,7 @@ package org.apache.nifi.services.couchbase;
 import org.apache.nifi.services.couchbase.exception.CouchbaseException;
 import org.apache.nifi.services.couchbase.exception.ExceptionCategory;
 import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
 import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
 
 public interface CouchbaseClient {
@@ -27,5 +28,15 @@ public interface CouchbaseClient {
 
     CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) 
throws CouchbaseException;
 
+    boolean documentExists(String documentId) throws CouchbaseException;
+
+    void insertDocument(String documentId, byte[] content) throws 
CouchbaseException;
+
+    void removeDocument(String documentId) throws CouchbaseException;
+
+    void replaceDocument(String documentId, byte[] content, long cas) throws 
CouchbaseException;
+
+    CouchbaseLookupInResult lookupIn(String documentId, String subDocPath) 
throws CouchbaseException;
+
     ExceptionCategory getExceptionCategory(Throwable throwable);
 }
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java
similarity index 78%
copy from 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
copy to 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java
index ea2436e621f..6153f184c39 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java
@@ -16,13 +16,9 @@
  */
 package org.apache.nifi.services.couchbase.exception;
 
-public class CouchbaseException extends Exception {
+public class CouchbaseCasMismatchException extends CouchbaseException {
 
-    public CouchbaseException(final String message) {
-        super(message);
-    }
-
-    public CouchbaseException(final String message, final Throwable cause) {
-        super(cause);
+    public CouchbaseCasMismatchException(final String message, final Throwable 
cause) {
+        super(message, cause);
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java
similarity index 78%
copy from 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
copy to 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java
index ea2436e621f..044507e4ac5 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java
@@ -16,13 +16,9 @@
  */
 package org.apache.nifi.services.couchbase.exception;
 
-public class CouchbaseException extends Exception {
+public class CouchbaseDocExistsException extends CouchbaseException {
 
-    public CouchbaseException(final String message) {
-        super(message);
-    }
-
-    public CouchbaseException(final String message, final Throwable cause) {
-        super(cause);
+    public CouchbaseDocExistsException(final String message, final Throwable 
cause) {
+        super(message, cause);
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java
similarity index 78%
copy from 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
copy to 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java
index ea2436e621f..cb8090b448e 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java
@@ -16,13 +16,9 @@
  */
 package org.apache.nifi.services.couchbase.exception;
 
-public class CouchbaseException extends Exception {
+public class CouchbaseDocNotFoundException extends CouchbaseException {
 
-    public CouchbaseException(final String message) {
-        super(message);
-    }
-
-    public CouchbaseException(final String message, final Throwable cause) {
-        super(cause);
+    public CouchbaseDocNotFoundException(final String message, final Throwable 
cause) {
+        super(message, cause);
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
index ea2436e621f..600e38dc1c9 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
@@ -23,6 +23,6 @@ public class CouchbaseException extends Exception {
     }
 
     public CouchbaseException(final String message, final Throwable cause) {
-        super(cause);
+        super(message, cause);
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java
similarity index 73%
copy from 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
copy to 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java
index ea2436e621f..c916e29f343 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java
@@ -14,15 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.services.couchbase.exception;
+package org.apache.nifi.services.couchbase.utils;
 
-public class CouchbaseException extends Exception {
-
-    public CouchbaseException(final String message) {
-        super(message);
-    }
-
-    public CouchbaseException(final String message, final Throwable cause) {
-        super(cause);
-    }
+public record CouchbaseLookupInResult(Object resultContent, long cas) {
 }
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml
similarity index 52%
copy from 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
copy to 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml
index 31482f7dadb..0136ce0bdb2 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml
@@ -23,21 +23,52 @@
         <version>2.9.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>nifi-couchbase-nar</artifactId>
-    <packaging>nar</packaging>
+    <artifactId>nifi-couchbase-services</artifactId>
+    <packaging>jar</packaging>
 
     <dependencies>
+        <!-- Internal dependencies -->
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-couchbase-processors</artifactId>
+            <artifactId>nifi-couchbase-services-api</artifactId>
             <version>2.9.0-SNAPSHOT</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-couchbase-services-api-nar</artifactId>
+            <artifactId>nifi-lookup-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-record-utils</artifactId>
+            <version>2.9.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>2.9.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
             <version>2.9.0-SNAPSHOT</version>
-            <type>nar</type>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 
-</project>
\ No newline at end of file
+</project>
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java
new file mode 100644
index 00000000000..9c9d66348f7
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
+import org.apache.nifi.services.couchbase.utils.DocumentType;
+
+import java.util.Set;
+
+public abstract class AbstractCouchbaseService extends 
AbstractControllerService {
+
+    protected static final String KEY = "key";
+    protected static final Set<String> REQUIRED_KEYS = Set.of(KEY);
+
+    private static final String DEFAULT_BUCKET = "default";
+    private static final String DEFAULT_SCOPE = "_default";
+    private static final String DEFAULT_COLLECTION = "_default";
+
+    public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Couchbase Connection Service")
+            .description("Service responsible for managing connections to 
Couchbase cluster")
+            .required(true)
+            .identifiesControllerService(CouchbaseConnectionService.class)
+            .build();
+
+    public static final PropertyDescriptor BUCKET_NAME = new 
PropertyDescriptor.Builder()
+            .name("Bucket Name")
+            .description("The name of the bucket where documents will be 
stored. Each bucket contains a hierarchy of scopes and collections to group 
keys and values logically")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue(DEFAULT_BUCKET)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    public static final PropertyDescriptor SCOPE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Scope Name")
+            .description("The name of the scope which is a logical namespace 
within a bucket, serving to categorize and organize related collections")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue(DEFAULT_SCOPE)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    public static final PropertyDescriptor COLLECTION_NAME = new 
PropertyDescriptor.Builder()
+            .name("Collection Name")
+            .description("The name of collection which is a logical container 
within a scope, used to hold documents")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue(DEFAULT_COLLECTION)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    protected volatile CouchbaseClient couchbaseClient;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final CouchbaseConnectionService connectionService = 
context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class);
+        final CouchbaseContext couchbaseContext = getCouchbaseContext(context);
+        couchbaseClient = connectionService.getClient(couchbaseContext);
+    }
+
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+
+    private CouchbaseContext getCouchbaseContext(final ConfigurationContext 
context) {
+        final String bucketName = 
context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue();
+        final String scopeName = 
context.getProperty(SCOPE_NAME).evaluateAttributeExpressions().getValue();
+        final String collectionName = 
context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue();
+
+        return new CouchbaseContext(bucketName, scopeName, collectionName, 
DocumentType.BINARY);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java
new file mode 100644
index 00000000000..8a35b0ea2d4
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@Tags({"lookup", "enrich", "key", "value", "couchbase"})
+@CapabilityDescription("Lookup a string value from Couchbase Server associated 
with the specified key. The coordinates that are passed to the lookup must 
contain the key 'key'.")
+public class CouchbaseKeyValueLookupService extends AbstractCouchbaseService 
implements StringLookupService {
+
+    public static final PropertyDescriptor LOOKUP_SUB_DOC_PATH = new 
PropertyDescriptor.Builder()
+            .name("Lookup Sub-Document Path")
+            .description("The Sub-Document lookup path within the target JSON 
document")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = List.of(
+            COUCHBASE_CONNECTION_SERVICE,
+            BUCKET_NAME,
+            SCOPE_NAME,
+            COLLECTION_NAME,
+            LOOKUP_SUB_DOC_PATH
+    );
+
+    private volatile String subDocPath;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        super.onEnabled(context);
+        subDocPath = 
context.getProperty(LOOKUP_SUB_DOC_PATH).evaluateAttributeExpressions().getValue();
+    }
+
+    @Override
+    public Optional<String> lookup(final Map<String, Object> coordinates) 
throws LookupFailureException {
+        final Object documentId = coordinates.get(KEY);
+
+        if (documentId == null) {
+            return Optional.empty();
+        }
+
+        try {
+            final CouchbaseLookupInResult result = 
couchbaseClient.lookupIn(documentId.toString(), subDocPath);
+            return 
Optional.ofNullable(result.resultContent()).map(Object::toString);
+        } catch (final CouchbaseDocNotFoundException e) {
+            return Optional.empty();
+        } catch (final Exception e) {
+            throw new LookupFailureException("Key-value lookup for Document ID 
[%s] failed".formatted(documentId), e);
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java
new file mode 100644
index 00000000000..9ff3354f0af
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.services.couchbase.exception.CouchbaseCasMismatchException;
+import 
org.apache.nifi.services.couchbase.exception.CouchbaseDocExistsException;
+import 
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+
+@Tags({"distributed", "cache", "map", "cluster", "couchbase"})
+@CapabilityDescription("""
+        Provides the ability to communicate with a Couchbase Server cluster as 
a DistributedMapCacheServer.
+        This can be used in order to share a Map between nodes in a NiFi 
cluster.
+        Couchbase Server cluster can provide a high available and persistent 
cache storage.""")
+public class CouchbaseMapCacheClient extends AbstractCouchbaseService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    private static final List<PropertyDescriptor> PROPERTIES = List.of(
+            COUCHBASE_CONNECTION_SERVICE,
+            BUCKET_NAME,
+            SCOPE_NAME,
+            COLLECTION_NAME
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final 
Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws 
IOException {
+        final String documentId = serializeDocumentKey(key, keySerializer);
+        try {
+            final CouchbaseGetResult result = 
couchbaseClient.getDocument(documentId);
+            return new AtomicCacheEntry<>(key, 
deserializeDocument(valueDeserializer, result.resultContent()), result.cas());
+        } catch (final CouchbaseDocNotFoundException e) {
+            return null;
+        } catch (final CouchbaseException e) {
+            throw new IOException("Failed to fetch cache entry with Document 
ID [%s] from Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, 
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+        final String documentId = serializeDocumentKey(entry.getKey(), 
keySerializer);
+        final byte[] document = serializeDocument(entry.getValue(), 
valueSerializer);
+        final Optional<Long> revision = entry.getRevision();
+
+        if (revision.isEmpty()) {
+            try {
+                couchbaseClient.insertDocument(documentId, document);
+                return true;
+            } catch (final CouchbaseDocExistsException e) {
+                return false;
+            } catch (final CouchbaseException e) {
+                throw new IOException("Failed to insert cache entry with 
Document ID [%s] into Couchbase".formatted(documentId), e);
+            }
+        }
+
+        try {
+            final long casValue = revision.get();
+            couchbaseClient.replaceDocument(documentId, document, casValue);
+            return true;
+        } catch (final CouchbaseDocNotFoundException | 
CouchbaseCasMismatchException e) {
+            return false;
+        } catch (final CouchbaseException e) {
+            throw new IOException("Failed to replace cache entry with Document 
ID [%s] in Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+        final String documentId = serializeDocumentKey(key, keySerializer);
+        final byte[] document = serializeDocument(value, valueSerializer);
+
+        try {
+            couchbaseClient.insertDocument(documentId, document);
+            return true;
+        } catch (final CouchbaseDocExistsException e) {
+            return false;
+        } catch (final CouchbaseException e) {
+            throw new IOException("Failed to insert cache entry with Document 
ID [%s] into Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
+        final V document = get(key, keySerializer, valueDeserializer);
+        if (document != null) {
+            return document;
+        }
+
+        boolean putResult = putIfAbsent(key, value, keySerializer, 
valueSerializer);
+        if (!putResult) {
+            return getAndPutIfAbsent(key, value, keySerializer, 
valueSerializer, valueDeserializer);
+        }
+        return null;
+    }
+
+    @Override
+    public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
+        final String documentId = serializeDocumentKey(key, keySerializer);
+
+        try {
+            return couchbaseClient.documentExists(documentId);
+        } catch (final CouchbaseException e) {
+            throw new IOException("Failed to check existence of cache entry 
with Document ID [%s] in Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
+        final String documentId = serializeDocumentKey(key, keySerializer);
+        final byte[] document = serializeDocument(value, valueSerializer);
+
+        try {
+            couchbaseClient.upsertDocument(documentId, document);
+        } catch (final CouchbaseException e) {
+            throw new IOException("Failed to insert cache entry with Document 
ID [%s] into Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public <K, V> V get(final K key, final Serializer<K> keySerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
+        final String documentId = serializeDocumentKey(key, keySerializer);
+
+        try {
+            final CouchbaseGetResult result = 
couchbaseClient.getDocument(documentId);
+            return deserializeDocument(valueDeserializer, 
result.resultContent());
+        } catch (final CouchbaseDocNotFoundException e) {
+            return null;
+        } catch (final CouchbaseException e) {
+            throw new IOException("Failed to fetch cache entry with Document 
ID [%s] from Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public <K> boolean remove(final K key, final Serializer<K> serializer) 
throws IOException {
+        final String documentId = serializeDocumentKey(key, serializer);
+
+        try {
+            couchbaseClient.removeDocument(documentId);
+            return true;
+        } catch (final CouchbaseDocNotFoundException e) {
+            return false;
+        } catch (final CouchbaseException e) {
+            throw new IOException("Failed to remove cache entry with Document 
ID [%s] from Couchbase".formatted(documentId), e);
+        }
+    }
+
+    private <S> String serializeDocumentKey(final S key, final Serializer<S> 
serializer) throws IOException {
+        final String result;
+
+        if (key instanceof String) {
+            result = (String) key;
+        } else {
+            final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+            serializer.serialize(key, stream);
+            result = stream.toString(StandardCharsets.UTF_8);
+        }
+
+        if (result.isEmpty()) {
+            throw new IOException("Cache entry key cannot be empty!");
+        }
+
+        return result;
+    }
+
+    private <S> byte[] serializeDocument(final S value, final Serializer<S> 
serializer) throws IOException {
+        final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        serializer.serialize(value, stream);
+        return stream.toByteArray();
+    }
+
+    private static <V> V deserializeDocument(final Deserializer<V> 
deserializer, final byte[] value) throws IOException {
+        return deserializer.deserialize(value);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java
new file mode 100644
index 00000000000..4dcf3b40f5e
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import 
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Tags({"lookup", "enrich", "couchbase"})
+@CapabilityDescription("Lookup a record from Couchbase Server associated with 
the specified key. The coordinates that are passed to the lookup must contain 
the key 'key'.")
+public class CouchbaseRecordLookupService extends AbstractCouchbaseService 
implements RecordLookupService {
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The Record Reader to use for parsing fetched 
document from Couchbase Server")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = List.of(
+            COUCHBASE_CONNECTION_SERVICE,
+            BUCKET_NAME,
+            SCOPE_NAME,
+            COLLECTION_NAME,
+            RECORD_READER
+    );
+
+    private volatile RecordReaderFactory readerFactory;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        super.onEnabled(context);
+        readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+    }
+
+    @Override
+    public Optional<Record> lookup(final Map<String, Object> coordinates) 
throws LookupFailureException {
+        final Object documentId = coordinates.get(KEY);
+
+        if (documentId == null) {
+            return Optional.empty();
+        }
+
+        CouchbaseGetResult result;
+        try {
+            result = couchbaseClient.getDocument(documentId.toString());
+        } catch (final CouchbaseDocNotFoundException e) {
+            return Optional.empty();
+        } catch (final Exception e) {
+            throw new LookupFailureException("Failed to look up record with 
Document ID [%s] in Couchbase.".formatted(documentId), e);
+        }
+
+        try (final InputStream input = new 
ByteArrayInputStream(result.resultContent())) {
+            final long inputLength = result.resultContent().length;
+            final Map<String, String> stringMap = 
coordinates.entrySet().stream()
+                    .collect(Collectors.toMap(
+                            Map.Entry::getKey,
+                            e -> String.valueOf(e.getValue())
+                    ));
+
+            final RecordReader recordReader = 
readerFactory.createRecordReader(stringMap, input, inputLength, getLogger());
+            return Optional.ofNullable(recordReader.nextRecord());
+        } catch (final Exception e) {
+            throw new LookupFailureException("Failed to parse the looked-up 
record with Document ID [%s]".formatted(documentId), e);
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 00000000000..772920f46fe
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.couchbase.CouchbaseKeyValueLookupService
+org.apache.nifi.services.couchbase.CouchbaseRecordLookupService
+org.apache.nifi.services.couchbase.CouchbaseMapCacheClient
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java
similarity index 50%
copy from 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
copy to 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java
index 9ad04fbe01d..5a631c176a3 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java
@@ -16,16 +16,21 @@
  */
 package org.apache.nifi.services.couchbase;
 
-import org.apache.nifi.services.couchbase.exception.CouchbaseException;
-import org.apache.nifi.services.couchbase.exception.ExceptionCategory;
-import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
-import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-public interface CouchbaseClient {
+abstract class AbstractCouchbaseServiceTest {
 
-    CouchbaseGetResult getDocument(String documentId) throws 
CouchbaseException;
+    protected static final String CONNECTION_SERVICE_ID = 
"couchbaseConnectionService";
+    protected static final String TEST_DOCUMENT_ID = "test-document-id";
+    protected static final String TEST_DOCUMENT_CONTENT = 
"{\"key\":\"value\"}";
+    protected static final long TEST_CAS = 1L;
 
-    CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) 
throws CouchbaseException;
-
-    ExceptionCategory getExceptionCategory(Throwable throwable);
+    protected static CouchbaseConnectionService 
mockConnectionService(CouchbaseClient client) {
+        final CouchbaseConnectionService connectionService = 
mock(CouchbaseConnectionService.class);
+        
when(connectionService.getIdentifier()).thenReturn(CONNECTION_SERVICE_ID);
+        when(connectionService.getClient(any())).thenReturn(client);
+        return connectionService;
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java
new file mode 100644
index 00000000000..9ccdb5ef8de
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.lookup.LookupFailureException;
+import 
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class CouchbaseKeyValueLookupServiceTest extends AbstractCouchbaseServiceTest {
+
+    private CouchbaseKeyValueLookupService lookupService;
+    private CouchbaseClient client;
+
+    @BeforeEach
+    void init() {
+        lookupService = new CouchbaseKeyValueLookupService();
+        client = mock(CouchbaseClient.class);
+
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
+        final MockControllerServiceInitializationContext 
serviceInitializationContext = new 
MockControllerServiceInitializationContext(connectionService, 
CONNECTION_SERVICE_ID);
+        final Map<PropertyDescriptor, String> properties = 
Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+        final MockConfigurationContext context = new 
MockConfigurationContext(properties, serviceInitializationContext, new 
HashMap<>());
+
+        lookupService.onEnabled(context);
+    }
+
+    @Test
+    void testSuccessfulLookup() throws CouchbaseException, 
LookupFailureException {
+        when(client.lookupIn(anyString(), any())).thenReturn(new 
CouchbaseLookupInResult("test result", TEST_CAS));
+
+        final Map<String, Object> coordinates = Collections.singletonMap(KEY, 
TEST_DOCUMENT_ID);
+        final Optional<String> result = lookupService.lookup(coordinates);
+
+        assertTrue(result.isPresent());
+        assertEquals("test result", result.get());
+    }
+
+    @Test
+    void testLookupFailure() throws CouchbaseException {
+        when(client.lookupIn(anyString(), any())).thenThrow(new 
CouchbaseException("Test exception"));
+
+        final Map<String, Object> coordinates = Collections.singletonMap(KEY, 
TEST_DOCUMENT_ID);
+
+        assertThrows(LookupFailureException.class, () -> 
lookupService.lookup(coordinates));
+    }
+
+    @Test
+    void testDocumentNotFoundInLookup() throws CouchbaseException, 
LookupFailureException {
+        when(client.lookupIn(anyString(), any())).thenThrow(new 
CouchbaseDocNotFoundException("Test doc not found exception", null));
+
+        final Map<String, Object> coordinates = Collections.singletonMap(KEY, 
TEST_DOCUMENT_ID);
+        final Optional<String> result = lookupService.lookup(coordinates);
+
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    void testMissingKey() throws LookupFailureException {
+        final Optional<String> result = 
lookupService.lookup(Collections.emptyMap());
+
+        assertTrue(result.isEmpty());
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java
new file mode 100644
index 00000000000..45475cfcf25
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class CouchbaseMapCacheClientTest extends AbstractCouchbaseServiceTest {
+
+    private final Serializer<String> stringSerializer = (value, output) -> 
output.write(value.getBytes(StandardCharsets.UTF_8));
+    private final Deserializer<String> stringDeserializer = input -> new 
String(input, StandardCharsets.UTF_8);
+    private CouchbaseMapCacheClient mapCacheClient;
+    private CouchbaseClient client;
+
+    @BeforeEach
+    void init() {
+        mapCacheClient = new CouchbaseMapCacheClient();
+        client = mock(CouchbaseClient.class);
+
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
+        final MockControllerServiceInitializationContext 
serviceInitializationContext = new 
MockControllerServiceInitializationContext(connectionService, 
CONNECTION_SERVICE_ID);
+        final Map<PropertyDescriptor, String> properties = 
Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+        final MockConfigurationContext context = new 
MockConfigurationContext(properties, serviceInitializationContext, new 
HashMap<>());
+
+        mapCacheClient.onEnabled(context);
+    }
+
+    @Test
+    void testCacheGet() throws CouchbaseException, IOException {
+        when(client.getDocument(anyString())).thenReturn(new 
CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS));
+
+        final String result = mapCacheClient.get(TEST_DOCUMENT_ID, 
stringSerializer, stringDeserializer);
+
+        assertEquals(TEST_DOCUMENT_CONTENT, result);
+    }
+
+    @Test
+    void testCacheGetFailure() throws CouchbaseException {
+        when(client.getDocument(anyString())).thenThrow(new 
CouchbaseException("Test exception", null));
+
+        assertThrows(IOException.class, () -> 
mapCacheClient.get(TEST_DOCUMENT_ID, stringSerializer, stringDeserializer));
+    }
+
+    @Test
+    void testCacheGetNotFound() throws CouchbaseException, IOException {
+        when(client.getDocument(anyString())).thenThrow(new 
CouchbaseDocNotFoundException("Test doc not found exception", null));
+
+        final String result = mapCacheClient.get(TEST_DOCUMENT_ID, 
stringSerializer, stringDeserializer);
+
+        assertNull(result);
+    }
+
+    @Test
+    void testCachePut() throws CouchbaseException, IOException {
+        when(client.upsertDocument(anyString(), any())).thenReturn(new 
CouchbaseUpsertResult(TEST_CAS));
+
+        mapCacheClient.put(TEST_DOCUMENT_ID, TEST_DOCUMENT_CONTENT, 
stringSerializer, stringSerializer);
+
+        verify(client, times(1)).upsertDocument(eq(TEST_DOCUMENT_ID), 
eq(TEST_DOCUMENT_CONTENT.getBytes()));
+    }
+
+    @Test
+    void testCachePutFailure() throws CouchbaseException {
+        when(client.upsertDocument(anyString(), any())).thenThrow(new 
CouchbaseException("Test exception"));
+
+        assertThrows(IOException.class, () -> 
mapCacheClient.put(TEST_DOCUMENT_ID, TEST_DOCUMENT_CONTENT, stringSerializer, 
stringSerializer));
+    }
+
+    @Test
+    void testCacheRemove() throws CouchbaseException, IOException {
+        mapCacheClient.remove(TEST_DOCUMENT_ID, stringSerializer);
+
+        verify(client, times(1)).removeDocument(eq(TEST_DOCUMENT_ID));
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java
new file mode 100644
index 00000000000..d55f5cdb697
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class CouchbaseRecordLookupServiceTest extends AbstractCouchbaseServiceTest {
+
+    private static final String LOOKUP_SERVICE_ID = "lookupService";
+    private static final String RECORD_READER_ID = "recordReaderService";
+
+    private CouchbaseRecordLookupService lookupService;
+    private CouchbaseClient client;
+
+    @BeforeEach
+    void setup() throws InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(NoOpProcessor.class);
+        lookupService = new CouchbaseRecordLookupService();
+        client = mock(CouchbaseClient.class);
+
+        final CouchbaseConnectionService connectionService = 
mockConnectionService(client);
+
+        final MockRecordParser readerFactory = new MockRecordParser();
+        readerFactory.addSchemaField("key", RecordFieldType.STRING);
+        readerFactory.addRecord("value");
+
+        runner.addControllerService(CONNECTION_SERVICE_ID, connectionService);
+        runner.addControllerService(RECORD_READER_ID, readerFactory);
+        runner.addControllerService(LOOKUP_SERVICE_ID, lookupService);
+
+        runner.enableControllerService(connectionService);
+        runner.enableControllerService(readerFactory);
+
+        runner.setProperty(lookupService, COUCHBASE_CONNECTION_SERVICE, 
CONNECTION_SERVICE_ID);
+        runner.setProperty(lookupService, 
CouchbaseRecordLookupService.RECORD_READER, RECORD_READER_ID);
+
+        runner.enableControllerService(lookupService);
+    }
+
+    @Test
+    void testSuccessfulLookup() throws LookupFailureException, 
CouchbaseException {
+        when(client.getDocument(anyString())).thenReturn(new 
CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS));
+
+        final Map<String, Object> coordinates = Collections.singletonMap(KEY, 
TEST_DOCUMENT_ID);
+        final Optional<Record> result = lookupService.lookup(coordinates);
+
+        assertTrue(result.isPresent());
+
+        final List<RecordField> fields = Collections.singletonList(new 
RecordField("key", RecordFieldType.STRING.getDataType()));
+        final Record expectedRecord = new MapRecord(new 
SimpleRecordSchema(fields), Collections.singletonMap("key", "value"));
+
+        assertEquals(expectedRecord, result.get());
+    }
+
+    @Test
+    void testLookupFailure() throws CouchbaseException {
+        when(client.getDocument(anyString())).thenThrow(new 
CouchbaseException("Test exception"));
+
+        final Map<String, Object> coordinates = Collections.singletonMap(KEY, 
TEST_DOCUMENT_ID);
+
+        assertThrows(LookupFailureException.class, () -> 
lookupService.lookup(coordinates));
+    }
+
+    @Test
+    void testMissingKey() throws LookupFailureException {
+        final Optional<Record> result = 
lookupService.lookup(Collections.emptyMap());
+
+        assertTrue(result.isEmpty());
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
index dce98be9196..e5b7e436999 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
@@ -45,20 +45,34 @@ import com.couchbase.client.java.Collection;
 import com.couchbase.client.java.codec.RawBinaryTranscoder;
 import com.couchbase.client.java.codec.RawJsonTranscoder;
 import com.couchbase.client.java.codec.Transcoder;
+import com.couchbase.client.java.json.JsonArray;
+import com.couchbase.client.java.json.JsonObject;
+import com.couchbase.client.java.kv.ExistsResult;
 import com.couchbase.client.java.kv.GetOptions;
 import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.InsertOptions;
+import com.couchbase.client.java.kv.LookupInResult;
+import com.couchbase.client.java.kv.LookupInSpec;
 import com.couchbase.client.java.kv.MutationResult;
 import com.couchbase.client.java.kv.PersistTo;
+import com.couchbase.client.java.kv.ReplaceOptions;
 import com.couchbase.client.java.kv.ReplicateTo;
 import com.couchbase.client.java.kv.UpsertOptions;
+import 
org.apache.nifi.services.couchbase.exception.CouchbaseCasMismatchException;
+import 
org.apache.nifi.services.couchbase.exception.CouchbaseDocExistsException;
+import 
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
 import org.apache.nifi.services.couchbase.exception.CouchbaseException;
 import org.apache.nifi.services.couchbase.exception.ExceptionCategory;
 import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
 import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
 import org.apache.nifi.services.couchbase.utils.DocumentType;
 import org.apache.nifi.services.couchbase.utils.JsonValidator;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Predicate;
 
@@ -104,7 +118,7 @@ class StandardCouchbaseClient implements CouchbaseClient {
             entry(ValueTooLargeException.class, FAILURE)
     );
 
-    StandardCouchbaseClient(Collection collection, DocumentType documentType, 
PersistTo persistTo, ReplicateTo replicateTo) {
+    StandardCouchbaseClient(final Collection collection, final DocumentType 
documentType, final PersistTo persistTo, final ReplicateTo replicateTo) {
         this.collection = collection;
         this.documentType = documentType;
         this.persistTo = persistTo;
@@ -112,23 +126,25 @@ class StandardCouchbaseClient implements CouchbaseClient {
     }
 
     @Override
-    public CouchbaseGetResult getDocument(String documentId) throws 
CouchbaseException {
+    public CouchbaseGetResult getDocument(final String documentId) throws 
CouchbaseException {
         try {
             final GetResult result = collection.get(documentId, 
GetOptions.getOptions().transcoder(getTranscoder(documentType)));
 
             return new CouchbaseGetResult(result.contentAsBytes(), 
result.cas());
-        } catch (Exception e) {
+        } catch (final DocumentNotFoundException e) {
+            throw new CouchbaseDocNotFoundException("Couchbase document with 
key [%s] not found".formatted(documentId), e);
+        } catch (final Exception e) {
             throw new CouchbaseException("Failed to get document [%s] from 
Couchbase".formatted(documentId), e);
         }
     }
 
     @Override
-    public CouchbaseUpsertResult upsertDocument(String documentId, byte[] 
content) throws CouchbaseException {
-        try {
-            if (!getInputValidator(documentType).test(content)) {
-                throw new CouchbaseException("The provided input is invalid");
-            }
+    public CouchbaseUpsertResult upsertDocument(final String documentId, final 
byte[] content) throws CouchbaseException {
+        if (!getInputValidator(documentType).test(content)) {
+            throw new CouchbaseException("The provided input is invalid for 
document [%s]".formatted(documentId));
+        }
 
+        try {
             final MutationResult result = collection.upsert(documentId, 
content,
                     UpsertOptions.upsertOptions()
                             .durability(persistTo, replicateTo)
@@ -136,27 +152,126 @@ class StandardCouchbaseClient implements CouchbaseClient 
{
                             .clientContext(new HashMap<>()));
 
             return new CouchbaseUpsertResult(result.cas());
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new CouchbaseException("Failed to upsert document [%s] in 
Couchbase".formatted(documentId), e);
         }
     }
 
     @Override
-    public ExceptionCategory getExceptionCategory(Throwable throwable) {
+    public boolean documentExists(final String documentId) throws 
CouchbaseException {
+        try {
+            final ExistsResult result = collection.exists(documentId);
+            return result.exists();
+        } catch (final Exception e) {
+            throw new CouchbaseException("Failed to check document [%s] in 
Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public void insertDocument(final String documentId, final byte[] content) 
throws CouchbaseException {
+        if (!getInputValidator(documentType).test(content)) {
+            throw new CouchbaseException("The provided input is invalid for 
document [%s]".formatted(documentId));
+        }
+
+        try {
+            collection.insert(documentId, content,
+                    InsertOptions.insertOptions()
+                            .durability(persistTo, replicateTo)
+                            .transcoder(getTranscoder(documentType))
+                            .clientContext(new HashMap<>()));
+        } catch (final DocumentExistsException e) {
+            throw new CouchbaseDocExistsException("Document with key [%s] 
already exists".formatted(documentId), e);
+        } catch (final Exception e) {
+            throw new CouchbaseException("Failed to insert document [%s] in 
Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public void removeDocument(final String documentId) throws 
CouchbaseException {
+        try {
+            collection.remove(documentId);
+        } catch (final DocumentNotFoundException e) {
+            throw new CouchbaseDocNotFoundException("Couchbase document with 
key [%s] not found".formatted(documentId), e);
+        } catch (final Exception e) {
+            throw new CouchbaseException("Failed to remove document [%s] in 
Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public void replaceDocument(final String documentId, final byte[] content, 
final long cas) throws CouchbaseException {
+        if (!getInputValidator(documentType).test(content)) {
+            throw new CouchbaseException("The provided input is invalid for 
document [%s]".formatted(documentId));
+        }
+
+        try {
+            collection.replace(documentId, content,
+                    ReplaceOptions.replaceOptions()
+                            .cas(cas)
+                            .durability(persistTo, replicateTo)
+                            .transcoder(getTranscoder(documentType))
+                            .clientContext(new HashMap<>()));
+        } catch (final CasMismatchException e) {
+            throw new CouchbaseCasMismatchException("Couchbase document with 
key [%s] has been concurrently modified".formatted(documentId), e);
+        } catch (final DocumentNotFoundException e) {
+            throw new CouchbaseDocNotFoundException("Couchbase document with 
key [%s] not found".formatted(documentId), e);
+        } catch (final Exception e) {
+            throw new CouchbaseException("Failed to replace document [%s] in 
Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public CouchbaseLookupInResult lookupIn(final String documentId, final 
String subDocPath) throws CouchbaseException {
+        try {
+            final String documentPath = subDocPath == null ? "" : subDocPath;
+            final LookupInResult result = collection.lookupIn(documentId, 
Collections.singletonList(LookupInSpec.get(documentPath)));
+
+            if (!result.exists(0)) {
+                throw new CouchbaseException("No value found on the requested 
path [%s] in Couchbase".formatted(subDocPath));
+            }
+
+            Object lookupInResult;
+            try {
+                lookupInResult = result.contentAs(0, Object.class);
+            } catch (final DecodingFailureException e) {
+                lookupInResult = result.contentAs(0, byte[].class);
+            }
+
+            return new 
CouchbaseLookupInResult(deserializeLookupInResult(lookupInResult), 
result.cas());
+        } catch (final DocumentNotFoundException e) {
+            throw new CouchbaseDocNotFoundException("Couchbase document with 
key [%s] not found".formatted(documentId), e);
+        } catch (final Exception e) {
+            throw new CouchbaseException("Failed to look up in document [%s] 
in Couchbase".formatted(documentId), e);
+        }
+    }
+
+    @Override
+    public ExceptionCategory getExceptionCategory(final Throwable throwable) {
         return exceptionMapping.getOrDefault(throwable.getClass(), FAILURE);
     }
 
-    private Transcoder getTranscoder(DocumentType documentType) {
+    private Transcoder getTranscoder(final DocumentType documentType) {
         return switch (documentType) {
             case JSON -> RawJsonTranscoder.INSTANCE;
             case BINARY -> RawBinaryTranscoder.INSTANCE;
         };
     }
 
-    private Predicate<byte[]> getInputValidator(DocumentType documentType) {
+    private Predicate<byte[]> getInputValidator(final DocumentType 
documentType) {
         return switch (documentType) {
             case JSON -> new JsonValidator();
             case BINARY -> v -> true;
         };
     }
+
+    private String deserializeLookupInResult(final Object result) {
+        return switch (result) {
+            case null -> null;
+            case String s -> s;
+            case Map map -> JsonObject.from(map).toString();
+            case List list -> JsonArray.from(list).toString();
+            case byte[] bytes -> new String(bytes, StandardCharsets.UTF_8);
+            default -> result.toString();
+        };
+
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseConnectionService.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseConnectionService.java
index beb516daeaf..92a31329c40 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseConnectionService.java
@@ -58,7 +58,7 @@ public class StandardCouchbaseConnectionService extends 
AbstractControllerServic
 
     public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
             .name("Username")
-            .description("The username to authenticate to the Couchbase 
client.")
+            .description("The username to authenticate to the Couchbase 
client")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -66,7 +66,7 @@ public class StandardCouchbaseConnectionService extends 
AbstractControllerServic
 
     public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
             .name("Password")
-            .description("The user's password to authenticate to the Couchbase 
client.")
+            .description("The user's password to authenticate to the Couchbase 
client")
             .required(true)
             .sensitive(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -74,13 +74,13 @@ public class StandardCouchbaseConnectionService extends 
AbstractControllerServic
 
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
             .name("SSL Context Service")
-            .description("Service supporting SSL communication configuration. 
The service is using one-way SSL, so only the trust store properties will be 
used.")
+            .description("Service supporting SSL communication configuration. 
The service is using one-way SSL, so only the trust store properties will be 
used")
             .identifiesControllerService(SSLContextService.class)
             .build();
 
     public static final PropertyDescriptor PERSISTENCE_STRATEGY = new 
PropertyDescriptor.Builder()
             .name("Persistence Strategy")
-            .description("Durability constraint about disk persistence.")
+            .description("Durability constraint about disk persistence")
             .required(true)
             .allowableValues(PersistTo.values())
             .defaultValue(PersistTo.NONE.toString())
@@ -88,7 +88,7 @@ public class StandardCouchbaseConnectionService extends 
AbstractControllerServic
 
     public static final PropertyDescriptor REPLICATION_STRATEGY = new 
PropertyDescriptor.Builder()
             .name("Replication Strategy")
-            .description("Durability constraint about replication.")
+            .description("Durability constraint about replication")
             .required(true)
             .allowableValues(ReplicateTo.values())
             .defaultValue(ReplicateTo.NONE.toString())
diff --git 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java
 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
similarity index 50%
rename from 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java
rename to 
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
index ad3d7e7d878..62a9bc064b4 100644
--- 
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java
+++ 
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
@@ -18,16 +18,23 @@ package org.apache.nifi.services.couchbase;
 
 import com.couchbase.client.java.Collection;
 import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.LookupInResult;
 import com.couchbase.client.java.kv.MutationResult;
 import com.couchbase.client.java.kv.PersistTo;
 import com.couchbase.client.java.kv.ReplicateTo;
 import org.apache.nifi.services.couchbase.exception.CouchbaseException;
 import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
 import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.nifi.services.couchbase.utils.DocumentType.JSON;
@@ -36,11 +43,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestCouchbaseClient {
+public class CouchbaseClientTest {
 
     private static final String TEST_DOCUMENT_ID = "test-document-id";
     private static final long TEST_CAS = 1L;
@@ -72,7 +80,24 @@ public class TestCouchbaseClient {
         final StandardCouchbaseClient client = new 
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
 
         final Exception exception = assertThrows(CouchbaseException.class, () 
-> client.upsertDocument(TEST_DOCUMENT_ID, content.getBytes()));
+        assertTrue(exception.getMessage().contains("The provided input is 
invalid"));
+    }
+
+    @Test
+    void testInsertJsonDocumentValidationFailure() {
+        final String content = "{invalid-json}";
+        final StandardCouchbaseClient client = new 
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+        final Exception exception = assertThrows(CouchbaseException.class, () 
-> client.insertDocument(TEST_DOCUMENT_ID, content.getBytes()));
+        assertTrue(exception.getMessage().contains("The provided input is 
invalid"));
+    }
+
+    @Test
+    void testReplaceJsonDocumentValidationFailure() {
+        final String content = "{invalid-json}";
+        final StandardCouchbaseClient client = new 
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
 
+        final Exception exception = assertThrows(CouchbaseException.class, () 
-> client.replaceDocument(TEST_DOCUMENT_ID, content.getBytes(), TEST_CAS));
         assertTrue(exception.getMessage().contains("The provided input is 
invalid"));
     }
 
@@ -94,4 +119,61 @@ public class TestCouchbaseClient {
         assertEquals(TEST_CAS, getResult.cas());
         assertArrayEquals(content.getBytes(), getResult.resultContent());
     }
+
+    @Test
+    void testLookupInWithMapResult() throws CouchbaseException {
+        final String expectedResult = "{\"name\":\"John\",\"age\":\"20\"}";
+        final StandardCouchbaseClient client = new 
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+        Map<String, String> lookupInContent = new HashMap<>();
+        lookupInContent.put("name", "John");
+        lookupInContent.put("age", "20");
+
+        final LookupInResult result = mock(LookupInResult.class);
+        when(result.contentAs(anyInt(), 
any(Class.class))).thenReturn(lookupInContent);
+        when(result.exists(anyInt())).thenReturn(true);
+        when(result.cas()).thenReturn(TEST_CAS);
+
+        when(collection.lookupIn(anyString(), any())).thenReturn(result);
+
+        final CouchbaseLookupInResult lookupInResult = 
client.lookupIn(TEST_DOCUMENT_ID, "");
+
+        assertEquals(expectedResult, lookupInResult.resultContent());
+        assertEquals(TEST_CAS, lookupInResult.cas());
+    }
+
+    @Test
+    void testLookupInWithArrayResult() throws CouchbaseException {
+        final String expectedResult = 
"[{\"name\":\"John\"},{\"name\":\"Jack\"}]";
+        final StandardCouchbaseClient client = new 
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+        List<Object> lookupInContent = new ArrayList<>();
+        lookupInContent.add(Collections.singletonMap("name", "John"));
+        lookupInContent.add(Collections.singletonMap("name", "Jack"));
+
+        final LookupInResult result = mock(LookupInResult.class);
+        when(result.contentAs(anyInt(), 
any(Class.class))).thenReturn(lookupInContent);
+        when(result.exists(anyInt())).thenReturn(true);
+        when(result.cas()).thenReturn(TEST_CAS);
+
+        when(collection.lookupIn(anyString(), any())).thenReturn(result);
+
+        final CouchbaseLookupInResult lookupInResult = 
client.lookupIn(TEST_DOCUMENT_ID, "");
+
+        assertEquals(expectedResult, lookupInResult.resultContent());
+        assertEquals(TEST_CAS, lookupInResult.cas());
+    }
+
+    @Test
+    void testLookupInWithNoResult() {
+        final StandardCouchbaseClient client = new 
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+        final LookupInResult result = mock(LookupInResult.class);
+        when(result.exists(anyInt())).thenReturn(false);
+
+        when(collection.lookupIn(anyString(), any())).thenReturn(result);
+
+        final Exception exception = assertThrows(CouchbaseException.class, () 
-> client.lookupIn(TEST_DOCUMENT_ID, "test-path"));
+        assertTrue(exception.getCause().getMessage().contains("No value found 
on the requested path [test-path] in Couchbase"));
+    }
 }
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml 
b/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml
index f332f74ee15..33af5cee38b 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml
@@ -34,6 +34,7 @@
         <module>nifi-couchbase-standard-services-nar</module>
         <module>nifi-couchbase-services-api</module>
         <module>nifi-couchbase-services-api-nar</module>
+        <module>nifi-couchbase-services</module>
     </modules>
 
 </project>
\ No newline at end of file

Reply via email to